At my current job, a new requirement came in:
Connect to an FTP Server and check if a file exists. We don’t know when the file is going to be uploaded. When the file is uploaded, notify a process with the file contents.
How should we approach this?
Table of contents
Open Table of contents
- Intro
- Erlang’s :ftp module
- FtpClient API
- Global name registry
- GenServer state
- Starting an FTP connection
- Fetching files from the FTP server
- :gen_tcp active/passive modes
- Get FTP file asynchronously
- Multiple caller pids for the same file name
- File name hash function
- Full implementation
- IEx dry-run
- Using the FtpClient in a Phoenix application
- Notes
Intro
Erlang and Elixir provide us with libraries and abstractions to solve this requirement. Erlang already has the :ftp
module, which let us connect securely to an FTP Server. Elixir gives us GenServer
abstraction, which allows us to
build a long-lived process to query the FTP Server and check if the file has been uploaded.
We’ll build the feature incrementally, exploring the pros/cons of our decisions.
We’ll start with this:
Loading graph...
Erlang’s :ftp module
Erlang’s :ftp
module is built around simplicity. Here are the functions we’ll use:
:ftp.open/2
. Open an FTP Connection.:ftp.user/3
. Authenticate a user with the FTP Server.:ftp.ls/1
. List files.:ftp.recv_bin/2
. Download a file in memory as binary.:ftp.send/2
. Upload a file to the FTP Server.
FtpClient API
Let’s start by defining the API the module will expose. The functions ls
and send_file
are exposed to be able to
debug the FTP connection.
defmodule FtpClient do
@moduledoc """
GenServer to poll an FTP Server for files.
"""
use GenServer
## API
@doc """
Lists FTP contents.
"""
def ls do
end
@doc """
Send file to FTP Server.
"""
def send_file(file_name) do
end
@doc """
Get an FTP file asynchronously by name. The result will be sent to the
calling process.
"""
def get_file(from_pid, file_name) when is_pid(from_pid) do
end
end
Now that we’ve defined the API, let’s complete them with the implementation.
defmodule FtpClient do
@moduledoc """
GenServer to poll an FTP Server for files.
"""
use GenServer
## API
@doc """
Lists FTP contents.
"""
def ls do
GenServer.call({:global, __MODULE__}, :ls)
end
@doc """
Send file to FTP Server.
"""
def send_file(file_name) do
GenServer.call({:global, __MODULE__}, {:send_file, to_charlist(file_name)})
end
@doc """
Get an FTP file asynchronously by ID. The result will be sent to the
calling process.
"""
def get_file(from_pid, file_name) when is_pid(from_pid) do
GenServer.cast({:global, __MODULE__}, {:get_file, from_pid, to_charlist(file_name)})
end
end
Global name registry
Note that we are using {:global, __MODULE__}
as the name of the GenServer
. Depending on your situation, you might
choose different name registration strategies. These strategies are listed in the official
docs.
I’ve chosen {:global, term()}
because our elixir nodes are clustered. When we use a global process in clustered nodes,
we need to be aware of split-brains or other network synchronization issues. See this great post talking about the
drawbacks of a single global process.
To register a global process, we need to pass :name
option when using GenServer.start_link/3
. Our
FtpClient.start_link/4
is part of our public API, and it’s defined like this:
def start_link({host, port, username, password}) do
GenServer.start_link(__MODULE__, {host, port, username, password}, name: {:global, __MODULE__})
end
We accept four arguments: host
, port
, username
, and password
, we’ll use these data to make the FTP connection.
GenServer state
As we are receiving the host, port, username, and password required to make a connection with the
FTP Server, we need to store these variables in the GenServer
state. The GenServer.start_link/3
calls the
__MODULE__
init/1
callback.
But… How to define GenServer
state?
The state is whatever value you return from the GenServer
’s init/1
callback, this state is normally a map %{}
.
From experience, I’ve learned is great to use defstruct
to define the GenServer
state, it helps with documentation
and to specify default values.
The :requests
key-value holds the caller’s requests, which consists of a key (hash of the file name), and a tuple in
the form of {caller_pid, file_name}
. We’ll use the caller_pid
(caller process identifier) to send the file contents
when the file is available in the FTP Server.
defmodule FtpClient do
...
defstruct [:host, :port, :username, :password, :socket, requests: %{}]
## API
def start_link({host, port, username, password}) do
GenServer.start_link(__MODULE__, {host, port, username, password}, name: {:global, __MODULE__})
end
# ... previous API functions
## Callbacks
@impl GenServer
def init({host, port, username, password}) do
{:ok,
%__MODULE__{
host: to_charlist(host),
port: to_charlist(port),
username: to_charlist(username),
password: to_charlist(password),
}, {:continue, :start_ftp_connection}}
end
end
The reason that I’m calling to_charlist/1
for each of the connection arguments is that Erlang modules work with
charlists
instead of strings. Given that we are going to be using :ftp
module heavily, I’ve parsed the arguments it
requires to charlists.
What about the third tuple item {:continue, :start_ftp_connection}
? As described in the
init/1
documentation, this option allows us to specify a callback
that will be invoked immediately after entering the GenServer
loop. This callback is handle_continue/2
and it’ll be
called with the argument we define as the second argument of the tuple, in this case: :start_ftp_connection
atom.
Starting an FTP connection
As you can guess from the name, the handle_continue/2
callback will make the connection to the FTP Server using the :ftp
Erlang module
and store the socket in the GenServer
state for future usage.
@impl GenServer
def handle_continue(:start_ftp_connection, %__MODULE__{host: host, port: port, username: username, password: password} = state) do
# Open FTP Connection
{:ok, socket} = :ftp.open(host, port: port)
# Authenticate
:ok = :ftp.user(socket, username, password)
schedule_fetch_files()
{:noreply, %__MODULE__{state | socket: socket}}
end
Fetching files from the FTP server
What’s the schedule_fetch_files/0
function from the previous code snippet? This function will schedule a message to be
sent to the same GenServer
process to fetch files every five seconds, let’s see its implementation.
defp schedule_fetch_files do
# self() it's the GenServer's pid
Process.send_after(self(), :fetch_files, to_timeout(second: 5))
end
All messages sent to the GenServer
are handled by the handle_info/2
callback. The implementation of the callback
that takes care of fetching the files will do the following:
- Iterate over the requests of the callers.
- For each request, fetch the file from the FTP Server.
- If the file exists, send the file contents to the caller and remove the request from the
requests
. - If it doesn’t exist, keep the request in the
requests
.
- If the file exists, send the file contents to the caller and remove the request from the
- Whatever the result,
schedule_fetch_files/0
to retry fetching the files again.
Let’s see it in action:
@impl GenServer
def handle_info(:fetch_files, %__MODULE__{socket: socket, requests: requests} = state) do
requests =
Map.reject(requests, fn {_id, {from_pid, file_name}} ->
# Here, we are storing the contents in memory. We could upload the contents to S3 or any other storage.
# See :ftp docs: https://www.erlang.org/doc/apps/ftp/ftp.html
case :ftp.recv_bin(socket, file_name) do
{:error, :epath} ->
Logger.debug("file #{file_name} not found")
false
{:ok, bin} ->
Logger.debug("file found bin=#{file_name}")
send(from_pid, {:ftp_file_fetched, to_string(file_name), bin})
true
end
end)
new_state = %__MODULE__{state | requests: requests}
schedule_fetch_files()
{:noreply, new_state}
end
I’ve added some Logger
calls to help us know what’s happening in the weeds.
A big concern that arises with this implementation is: Why are we iterating over the requests, one by one, instead of fetching the files asynchronously?
The reason is that an :ftp
socket is a TCP connection under the hood, and this socket can’t be shared between
processes unless we built a synchronization mechanism. Can you imagine how a socket could keep track of reading the
network and sending data to the caller between N processes? Who gets what chunk of the buffered data/messages? This
exploration deserves its own blog post.
We’ll see how to increase the asynchronicity of the FTP Client by using :poolboy
in Part 2 😉.
Now that we’ve seen how to fetch files, what about requesting files to our FtpClient
and receiving them
asynchronously?
:gen_tcp active/passive modes
As I mentioned previously, :ftp
uses TCP (:gen_tcp
) under the hood. One of the options to customize how data is
received is the option :mode
.
mode: :active
. Data is delivered to the calling process (e.g.GenServer
) as messages.mode: :passive
. Data must be retrieved by using:gen_tcp.recv/2
.
The :ftp.open/2
function accepts this mode, but I couldn’t make it work asynchronously to receive messages in the
GenServer
. I’ll update this post in the future if I find out how to achieve this.
:ftp.open/2
uses passive mode by default.
Get FTP file asynchronously
Here’s the following API and callback implementing :get_file
.
# The caller of this get_file/2 fn needs to send it's own pid.
def get_file(from_pid, file_name) when is_pid(from_pid) do
GenServer.cast({:global, __MODULE__}, {:get_file, from_pid, to_charlist(file_name)})
end
@impl GenServer
def handle_cast({:get_file, from_pid, file_name}, %__MODULE__{requests: requests} = state) do
request_id = generate_id(file_name)
new_state = %__MODULE__{state | requests: Map.put(requests, request_id, {from_pid, file_name})}
{:noreply, new_state}
end
The generate_id/1
function uses the file_name
to compute a hash. A hash function returns the same output for the
same input. This helps us to deduplicate requests for the same file_name
. But what about two different callers
requesting the same file_name
?
Multiple caller pids for the same file name
Let’s improve the implementation by updating the caller_pids
from the map value.
@impl GenServer
def handle_cast({:get_file, from_pid, file_name}, %__MODULE__{requests: requests} = state) do
request_id = generate_id(file_name)
new_state = %__MODULE__{
state
| requests:
Map.update(requests, request_id, {[from_pid], file_name}, fn {pids, file_name} ->
{[from_pid | pids], file_name}
end)
}
{:noreply, new_state}
end
We need to update the callback handle_info/2
for fetch_files
. Update the following code snippet:
# case .. do
{:ok, bin} ->
Logger.debug("file found bin=#{file_name}")
# Update this line to send the reply to each of the caller pids
Enum.each(from_pids, &(send(&1, {:ftp_file_fetched, to_string(file_name), bin})))
true
# end
File name hash function
Great! Now, the hash functions to generate the unique id. I’m using Blake2, one of Erlang’s fastest natively supported hash functions. As I said, this function returns the same output for the same file_name.
defp generate_id(file_name) do
:crypto.hash(:blake2s, file_name)
end
Full implementation
Here is our full implementation of the module. Go and play around with this!
defmodule FtpClient do
@moduledoc """
GenServer to poll an FTP Server for files.
"""
use GenServer
defstruct [:host, :port, :username, :password, :socket, requests: %{}]
require Logger
## API
def start_link({host, port, username, password}) do
GenServer.start_link(__MODULE__, {host, port, username, password}, name: {:global, __MODULE__})
end
@doc """
Lists FTP contents.
"""
def ls do
GenServer.call({:global, __MODULE__}, :ls)
end
@doc """
Send file to FTP Server.
"""
def send_file(file_name) do
GenServer.call({:global, __MODULE__}, {:send_file, to_charlist(file_name)})
end
@doc """
Get an FTP file asynchronously by ID. The result will be sent to the
calling process.
"""
def get_file(from_pid, file_name) when is_pid(from_pid) do
GenServer.cast({:global, __MODULE__}, {:get_file, from_pid, to_charlist(file_name)})
end
## Callbacks
@impl GenServer
def init({host, port, username, password}) do
{:ok,
%__MODULE__{
host: to_charlist(host),
port: to_charlist(port),
username: to_charlist(username),
password: to_charlist(password),
}, {:continue, :start_ftp_connection}}
end
@impl GenServer
def handle_call(:ls, _from, %{socket: socket} = state) do
{:ok, result} = :ftp.ls(socket)
{:reply, to_string(result), state}
end
@impl GenServer
def handle_call({:send_file, file_name}, _from, %__MODULE__{socket: socket} = state) do
:ok = :ftp.send(socket, file_name)
{:reply, :ok, state}
end
@impl GenServer
def handle_cast({:get_file, from_pid, file_name}, %__MODULE__{requests: requests} = state) do
request_id = generate_id(file_name)
new_state = %__MODULE__{state | requests: Map.put(requests, request_id, {from_pid, file_name})}
{:noreply, new_state}
end
@impl GenServer
def handle_info(:fetch_files, %__MODULE__{socket: socket, requests: requests} = state) do
requests =
Map.reject(requests, fn {_id, {from_pid, file_name}} ->
case :ftp.recv_bin(socket, file_name) do
{:error, :epath} ->
Logger.debug("file #{file_name} not found")
false
{:ok, bin} ->
Logger.debug("file found bin=#{file_name}")
send(from_pid, {:ftp_file_fetched, to_string(file_name), bin})
true
end
end)
new_state = %__MODULE__{state | requests: requests}
schedule_fetch_files()
{:noreply, new_state}
end
defp schedule_fetch_files do
Process.send_after(self(), :fetch_files, to_timeout(second: 5))
end
@impl GenServer
def handle_continue(:start_ftp_connection, state) do
# Open FTP Connection
{:ok, socket} = :ftp.open(state.host, port: state.port)
# Authenticate
:ok = :ftp.user(socket, state.username, state.password)
schedule_fetch_files()
{:noreply, %__MODULE__{state | socket: socket}}
end
defp generate_id(file_name) do
:crypto.hash(:blake2s, file_name)
end
end
IEx dry-run
Use this free FTP Server online: https://sftpcloud.io/tools/free-ftp-server.
Here is a dry-run of using the module in iex
:
iex> FtpClient.start_link({"eu-central-1.sftpcloud.io", 21, "<username>", "<password>"})
{:ok, #PID<0.616.0>}
iex> FtpClient.ls()
""
# Get a file. We subscribe ourselves to receive the file contents with `self()`
iex> FtpClient.get_file(self(), ".tool-versions")
# You should see this log every five seconds
[debug] line=83 file .tool-versions not found
# Now, let's upload the file!
iex> FtpClient.send(".tool-versions")
:ok
# You should see this log; the other "not found" logs should stop.
[debug] line=87 file found bin=.tool-versions
# Now, let's check if we received the file contents
iex> flush()
{:ftp_file_fetched, ".tool-versions", "erlang 27.1.2\nelixir 1.17.3-otp-27\n"}
# TADA!! 🥳
Using the FtpClient in a Phoenix application
Our FtpClient
is OTP compliant. When we wrote use GenServer
in the FtpClient
module, the macro automatically
defines a child_spec/1
function that allows it to be started as part of any supervision tree.
Here is the code if you want to check it out.
We can add our FtpClient
in the main app supervision tree by modifying the following from application.ex
:
childen = [
...
# Our FtpClient
{FtpClient, {host, port, username, password}}
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
You are ready to use FtpClient.get_file(self(), "my_file!")
in your application.
Notes
If you receive an error related to :ftp not available
, add to your mix.exs file:
...
extra_applications: [:logger, :runtime_tools, :ftp] # <--- Add :ftp
...
There are some edge cases my module doesn’t cover, use the following as an exercise to create a more robust FTP Client!
What happens if:
- The same caller calls
FtpClient.get_file/2
several times? - The FTP Connection is closed?
- The
:ftp.recv_bin/2
returns something different than{:error, :epath}
? - The server crashes, are the requests lost? How can we recover from this?
- The requests keep increasing and files are not being uploaded to the FTP Server, thus always requesting increasing files?
- The host, port, username, password combination don’t open an FTP connection? How do we recover from this?
- What are the code changes needed to support multiple
FtpClient
s to different FTP Servers?
In Part 2, we’ll learn how to increase the concurrency of reading files by leveraging a pool of FtpClientWorkers
using
:poolboy
.