Skip to content

Elixir FTP Client: GenServer - Part 1

Posted on:2024-11-23 | 12 min read

At my current job, a new requirement came in:

Connect to an FTP Server and check if a file exists. We don’t know when the file is going to be uploaded. When the file is uploaded, notify a process with the file contents.

How should we approach this?

Table of contents

Open Table of contents

Intro

Erlang and Elixir provide us with libraries and abstractions to solve this requirement. Erlang already has the :ftp module, which let us connect securely to an FTP Server. Elixir gives us GenServer abstraction, which allows us to build a long-lived process to query the FTP Server and check if the file has been uploaded.

We’ll build the feature incrementally, exploring the pros/cons of our decisions.

We’ll start with this:

Loading graph...

Erlang’s :ftp module

Erlang’s :ftp module is built around simplicity. Here are the functions we’ll use:

FtpClient API

Let’s start by defining the API the module will expose. The functions ls and send_file are exposed to be able to debug the FTP connection.

defmodule FtpClient do
  @moduledoc """
  GenServer to poll an FTP Server for files.
  """
  use GenServer

  ## API

  @doc """
  Lists FTP contents.
  """
  def ls do
  end

  @doc """
  Send file to FTP Server.
  """
  def send_file(file_name) do
  end

  @doc """
  Get an FTP file asynchronously by name. The result will be sent to the
  calling process.
  """
  def get_file(from_pid, file_name) when is_pid(from_pid) do
  end
end

Now that we’ve defined the API, let’s complete them with the implementation.

defmodule FtpClient do
  @moduledoc """
  GenServer to poll an FTP Server for files.
  """
  use GenServer

  ## API

  @doc """
  Lists FTP contents.
  """
  def ls do
    GenServer.call({:global, __MODULE__}, :ls)
  end

  @doc """
  Send file to FTP Server.
  """
  def send_file(file_name) do
    GenServer.call({:global, __MODULE__}, {:send_file, to_charlist(file_name)})
  end

  @doc """
  Get an FTP file asynchronously by ID. The result will be sent to the
  calling process.
  """
  def get_file(from_pid, file_name) when is_pid(from_pid) do
    GenServer.cast({:global, __MODULE__}, {:get_file, from_pid, to_charlist(file_name)})
  end
end

Global name registry

Note that we are using {:global, __MODULE__} as the name of the GenServer. Depending on your situation, you might choose different name registration strategies. These strategies are listed in the official docs.

I’ve chosen {:global, term()} because our elixir nodes are clustered. When we use a global process in clustered nodes, we need to be aware of split-brains or other network synchronization issues. See this great post talking about the drawbacks of a single global process.

To register a global process, we need to pass :name option when using GenServer.start_link/3. Our FtpClient.start_link/4 is part of our public API, and it’s defined like this:

def start_link({host, port, username, password}) do
  GenServer.start_link(__MODULE__, {host, port, username, password}, name: {:global, __MODULE__})
end

We accept four arguments: host, port, username, and password, we’ll use these data to make the FTP connection.

GenServer state

As we are receiving the host, port, username, and password required to make a connection with the FTP Server, we need to store these variables in the GenServer state. The GenServer.start_link/3 calls the __MODULE__ init/1 callback.

But… How to define GenServer state?

The state is whatever value you return from the GenServer’s init/1 callback, this state is normally a map %{}. From experience, I’ve learned is great to use defstruct to define the GenServer state, it helps with documentation and to specify default values.

The :requests key-value holds the caller’s requests, which consists of a key (hash of the file name), and a tuple in the form of {caller_pid, file_name}. We’ll use the caller_pid (caller process identifier) to send the file contents when the file is available in the FTP Server.

defmodule FtpClient do
  ...
  defstruct [:host, :port, :username, :password, :socket, requests: %{}]

  ## API

  def start_link({host, port, username, password}) do
    GenServer.start_link(__MODULE__, {host, port, username, password}, name: {:global, __MODULE__})
  end

  # ... previous API functions

  ## Callbacks

  @impl GenServer
  def init({host, port, username, password}) do
    {:ok,
     %__MODULE__{
       host: to_charlist(host),
       port: to_charlist(port),
       username: to_charlist(username),
       password: to_charlist(password),
     }, {:continue, :start_ftp_connection}}
  end
end

The reason that I’m calling to_charlist/1 for each of the connection arguments is that Erlang modules work with charlists instead of strings. Given that we are going to be using :ftp module heavily, I’ve parsed the arguments it requires to charlists.

What about the third tuple item {:continue, :start_ftp_connection}? As described in the init/1 documentation, this option allows us to specify a callback that will be invoked immediately after entering the GenServer loop. This callback is handle_continue/2 and it’ll be called with the argument we define as the second argument of the tuple, in this case: :start_ftp_connection atom.

Starting an FTP connection

As you can guess from the name, the handle_continue/2 callback will make the connection to the FTP Server using the :ftp Erlang module and store the socket in the GenServer state for future usage.

@impl GenServer
def handle_continue(:start_ftp_connection, %__MODULE__{host: host, port: port, username: username, password: password} = state) do
  # Open FTP Connection
  {:ok, socket} = :ftp.open(host, port: port)
  # Authenticate
  :ok = :ftp.user(socket, username, password)
  schedule_fetch_files()

  {:noreply, %__MODULE__{state | socket: socket}}
end

Fetching files from the FTP server

What’s the schedule_fetch_files/0 function from the previous code snippet? This function will schedule a message to be sent to the same GenServer process to fetch files every five seconds, let’s see its implementation.

defp schedule_fetch_files do
  # self() it's the GenServer's pid
  Process.send_after(self(), :fetch_files, to_timeout(second: 5))
end

All messages sent to the GenServer are handled by the handle_info/2 callback. The implementation of the callback that takes care of fetching the files will do the following:

  1. Iterate over the requests of the callers.
  2. For each request, fetch the file from the FTP Server.
    • If the file exists, send the file contents to the caller and remove the request from the requests.
    • If it doesn’t exist, keep the request in the requests.
  3. Whatever the result, schedule_fetch_files/0 to retry fetching the files again.

Let’s see it in action:

@impl GenServer
def handle_info(:fetch_files, %__MODULE__{socket: socket, requests: requests} = state) do
  requests =
    Map.reject(requests, fn {_id, {from_pid, file_name}} ->
      # Here, we are storing the contents in memory. We could upload the contents to S3 or any other storage.
      # See :ftp docs: https://www.erlang.org/doc/apps/ftp/ftp.html
      case :ftp.recv_bin(socket, file_name) do
        {:error, :epath} ->
          Logger.debug("file #{file_name} not found")
          false

        {:ok, bin} ->
          Logger.debug("file found bin=#{file_name}")
          send(from_pid, {:ftp_file_fetched, to_string(file_name), bin})
          true
      end
    end)

  new_state = %__MODULE__{state | requests: requests}
  schedule_fetch_files()

  {:noreply, new_state}
end

I’ve added some Logger calls to help us know what’s happening in the weeds.

A big concern that arises with this implementation is: Why are we iterating over the requests, one by one, instead of fetching the files asynchronously?

The reason is that an :ftp socket is a TCP connection under the hood, and this socket can’t be shared between processes unless we built a synchronization mechanism. Can you imagine how a socket could keep track of reading the network and sending data to the caller between N processes? Who gets what chunk of the buffered data/messages? This exploration deserves its own blog post.

We’ll see how to increase the asynchronicity of the FTP Client by using :poolboy in Part 2 😉.

Now that we’ve seen how to fetch files, what about requesting files to our FtpClient and receiving them asynchronously?

:gen_tcp active/passive modes

As I mentioned previously, :ftp uses TCP (:gen_tcp) under the hood. One of the options to customize how data is received is the option :mode.

The :ftp.open/2 function accepts this mode, but I couldn’t make it work asynchronously to receive messages in the GenServer. I’ll update this post in the future if I find out how to achieve this.

:ftp.open/2 uses passive mode by default.

Get FTP file asynchronously

Here’s the following API and callback implementing :get_file.

# The caller of this get_file/2 fn needs to send it's own pid.
def get_file(from_pid, file_name) when is_pid(from_pid) do
  GenServer.cast({:global, __MODULE__}, {:get_file, from_pid, to_charlist(file_name)})
end

@impl GenServer
def handle_cast({:get_file, from_pid, file_name}, %__MODULE__{requests: requests} = state) do
  request_id = generate_id(file_name)
  new_state = %__MODULE__{state | requests: Map.put(requests, request_id, {from_pid, file_name})}

  {:noreply, new_state}
end

The generate_id/1 function uses the file_name to compute a hash. A hash function returns the same output for the same input. This helps us to deduplicate requests for the same file_name. But what about two different callers requesting the same file_name?

Multiple caller pids for the same file name

Let’s improve the implementation by updating the caller_pids from the map value.

@impl GenServer
def handle_cast({:get_file, from_pid, file_name}, %__MODULE__{requests: requests} = state) do
  request_id = generate_id(file_name)

  new_state = %__MODULE__{
    state
    | requests:
        Map.update(requests, request_id, {[from_pid], file_name}, fn {pids, file_name} ->
          {[from_pid | pids], file_name}
        end)
  }

  {:noreply, new_state}
end

We need to update the callback handle_info/2 for fetch_files. Update the following code snippet:

# case .. do
  {:ok, bin} ->
    Logger.debug("file found bin=#{file_name}")
    # Update this line to send the reply to each of the caller pids
    Enum.each(from_pids, &(send(&1, {:ftp_file_fetched, to_string(file_name), bin})))
    true
# end

File name hash function

Great! Now, the hash functions to generate the unique id. I’m using Blake2, one of Erlang’s fastest natively supported hash functions. As I said, this function returns the same output for the same file_name.

defp generate_id(file_name) do
  :crypto.hash(:blake2s, file_name)
end

Full implementation

Here is our full implementation of the module. Go and play around with this!

defmodule FtpClient do
  @moduledoc """
  GenServer to poll an FTP Server for files.
  """
  use GenServer

  defstruct [:host, :port, :username, :password, :socket, requests: %{}]

  require Logger

  ## API

  def start_link({host, port, username, password}) do
    GenServer.start_link(__MODULE__, {host, port, username, password}, name: {:global, __MODULE__})
  end

  @doc """
  Lists FTP contents.
  """
  def ls do
    GenServer.call({:global, __MODULE__}, :ls)
  end

  @doc """
  Send file to FTP Server.
  """
  def send_file(file_name) do
    GenServer.call({:global, __MODULE__}, {:send_file, to_charlist(file_name)})
  end

  @doc """
  Get an FTP file asynchronously by ID. The result will be sent to the
  calling process.
  """
  def get_file(from_pid, file_name) when is_pid(from_pid) do
    GenServer.cast({:global, __MODULE__}, {:get_file, from_pid, to_charlist(file_name)})
  end

  ## Callbacks

  @impl GenServer
  def init({host, port, username, password}) do
    {:ok,
     %__MODULE__{
       host: to_charlist(host),
       port: to_charlist(port),
       username: to_charlist(username),
       password: to_charlist(password),
     }, {:continue, :start_ftp_connection}}
  end

  @impl GenServer
  def handle_call(:ls, _from, %{socket: socket} = state) do
    {:ok, result} = :ftp.ls(socket)
    {:reply, to_string(result), state}
  end

  @impl GenServer
  def handle_call({:send_file, file_name}, _from, %__MODULE__{socket: socket} = state) do
    :ok = :ftp.send(socket, file_name)
    {:reply, :ok, state}
  end

  @impl GenServer
  def handle_cast({:get_file, from_pid, file_name}, %__MODULE__{requests: requests} = state) do
    request_id = generate_id(file_name)
    new_state = %__MODULE__{state | requests: Map.put(requests, request_id, {from_pid, file_name})}

    {:noreply, new_state}
  end

  @impl GenServer
  def handle_info(:fetch_files, %__MODULE__{socket: socket, requests: requests} = state) do
    requests =
      Map.reject(requests, fn {_id, {from_pid, file_name}} ->
        case :ftp.recv_bin(socket, file_name) do
          {:error, :epath} ->
            Logger.debug("file #{file_name} not found")
            false

          {:ok, bin} ->
            Logger.debug("file found bin=#{file_name}")
            send(from_pid, {:ftp_file_fetched, to_string(file_name), bin})
            true
        end
      end)

    new_state = %__MODULE__{state | requests: requests}
    schedule_fetch_files()

    {:noreply, new_state}
  end

  defp schedule_fetch_files do
    Process.send_after(self(), :fetch_files, to_timeout(second: 5))
  end

  @impl GenServer
  def handle_continue(:start_ftp_connection, state) do
    # Open FTP Connection
    {:ok, socket} = :ftp.open(state.host, port: state.port)
    # Authenticate
    :ok = :ftp.user(socket, state.username, state.password)
    schedule_fetch_files()

    {:noreply, %__MODULE__{state | socket: socket}}
  end

  defp generate_id(file_name) do
    :crypto.hash(:blake2s, file_name)
  end
end

IEx dry-run

Use this free FTP Server online: https://sftpcloud.io/tools/free-ftp-server.

Here is a dry-run of using the module in iex:

iex> FtpClient.start_link({"eu-central-1.sftpcloud.io", 21, "<username>", "<password>"})
{:ok, #PID<0.616.0>}

iex> FtpClient.ls()
""

# Get a file. We subscribe ourselves to receive the file contents with `self()`
iex> FtpClient.get_file(self(), ".tool-versions")

# You should see this log every five seconds
[debug] line=83 file .tool-versions not found

# Now, let's upload the file!
iex> FtpClient.send(".tool-versions")
:ok

# You should see this log; the other "not found" logs should stop.
[debug] line=87 file found bin=.tool-versions

# Now, let's check if we received the file contents
iex> flush()
{:ftp_file_fetched, ".tool-versions", "erlang 27.1.2\nelixir 1.17.3-otp-27\n"}

# TADA!! 🥳

Using the FtpClient in a Phoenix application

Our FtpClient is OTP compliant. When we wrote use GenServer in the FtpClient module, the macro automatically defines a child_spec/1 function that allows it to be started as part of any supervision tree. Here is the code if you want to check it out.

We can add our FtpClient in the main app supervision tree by modifying the following from application.ex:

childen = [
  ...
  # Our FtpClient
  {FtpClient, {host, port, username, password}}
]

Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)

You are ready to use FtpClient.get_file(self(), "my_file!") in your application.

Notes

If you receive an error related to :ftp not available, add to your mix.exs file:

  ...
  extra_applications: [:logger, :runtime_tools, :ftp] # <--- Add :ftp
  ...

There are some edge cases my module doesn’t cover, use the following as an exercise to create a more robust FTP Client!

What happens if:


In Part 2, we’ll learn how to increase the concurrency of reading files by leveraging a pool of FtpClientWorkers using :poolboy.