hao2_00

By Benjamin Tan Wei Hao

In this article, excerpted from The Little Elixir & OTP Guidebook, I will guide the reader into implementing his/her own supervisor.

A supervisor is a process whose only job is to monitor one or more processes. These processes can be worker processes or even other supervisors.


hao2_01

Figure 1: A supervision tree can be layered with other supervision trees. Both supervisors and workers can be supervised.


Supervisors and workers are arranged in a supervision tree. If any of the workers die, the supervisor can restart the dead worker, and potentially other workers in the supervision tree, based on certain restart strategies. What are worker processes? They are usually processes that have implemented the GenServer, GenFSM or GenEvent behaviors.

So far, you have all the building blocks needed to build your own Supervisor. Once you are done with this section, Supervisors will not seem magical anymore, although that does not make them any less awesome.


Supervisor API

The follow table lists the API of the supervisor along with a brief description:


Table 1    A summary of APIs that we will implement

API

Description

start_link(child_spec_list)

Given a list of child specifications (possibly empty), start the supervisor process and corresponding children

start_child(supervisor, child_spec)

Given a supervisor pid and a child specification, start the child process and link it to the supervisor.

terminate_child(supervisor, pid)

Given a supervisor pid and a child pid, terminate the child.

restart_child(supervisor, pid, child_spec)

Given a supervisor pid, child pid, and a child specification, restart the child process and initialize the child process with the child specification.

count_children(supervisor)

Given the supervisor pid, return the number of child processes.

which_children(supervisor)

Given the supervisor pid, return the state of the supervisor.

Implementing the above API will give us a pretty good grasp of how the actual OTP Supervisor works under the hood.


Building Our Own Supervisor

As usual, we start with a new mix project. Since calling it Supervisor is unoriginal, and MySupervisor is boring, let’s give it some Old English flair and call it ThySupervisor instead:

% mix new thy_supervisor

As a form of revision, we are going to build our supervisor using the GenServer behavior. You might be surprised to know that the supervisor behavior does, in fact, implement the GenServer behavior.

defmodule ThySupervisor do
   use GenServer
  
 end

start_link(child_spec_list)

The first thing is to implement start_link/1.

defmodule ThySupervisor do
   use GenServer
  
   def start_link(child_spec_list) do
     GenServer.start_link(__MODULE__, [child_spec_list])
   end
  
 end

This is the main entry point to creating a supervisor process. Here, we call GenServer.start_link/2 with the name of the module and passing in a list with a single element of child_spec_list. child_spec_list specifies a list of (potentially empty) child specifications.

This is a fancy way of telling the supervisor what kinds of processes it should manage. A child specification for two (similar) workers could look like [{ThyWorker, :start_link, []}, {ThyWorker, :start_link, []}].

Recall that GenServer.start_link/2 expects the ThySupervisor.init/1 callback to be implemented. It passes the second argument (the list) into :init/1. Let’s do that:


Listing 1    thy_supervisor.ex – start_link/1 and init callback/1. Notice that exits are being trapped in the init/1 callback.

defmodule ThySupervisor do
   use GenServer
  
   #######
   # API #
   #######
  
   def start_link(child_spec_list) do
     GenServer.start_link(__MODULE__, [child_spec_list])
   end
  
   ######################
   # Callback Functions #
   ######################
  
   def init([child_spec_list]) do
     Process.flag(:trap_exit, true)                      #1 
     state = child_spec_list
               |> start_children 
               |> Enum.into(HashDict.new)
  
     {:ok, state}
   end
   end

#1 Make the supervisor process trap exits


The first thing we do here is to let the supervisor process trap exits. This is so that it can receive exit signals from its children as normal messages.

There is quite a bit going on in the lines that follow. The child_spec_list is fed into start_children/1. This function, as you will soon see, spawns the child processes and returns a list of tuples. Each tuple is a pair that contains the pid of the newly spawned child and the child specification. For example:

 [{<0.82.0>, {ThyWorker, :init, []}}, {<0.84.0>, {ThyWorker, :init, []}}]

This list is then fed into Enum.into/2. By passing in HashDict.new as the second argument, we are effectively transforming the list of tuples into a HashDict, with the pids of the child processes as the keys and the child specifications as the values.

transforming an enumerable to a collectable with enum.into

Enum.into/2 (and Enum.into/3 that takes an additional transformation function) takes an enumerable (like a List) and inserts it into a Collectable (like a HashDict. This is very helpful because HashDict knows that if it gets a tuple, the first element becomes the key, and the second element becomes the value:

iex> h = [{:pid1, {:mod1, :fun1, :arg1}}, {:pid2, {:mod2, :fun2, :arg2}}] |> Enum.into(HashDict.new)

This returns a HashDict:

#HashDict<[pid2: {:mod2, :fun2, :arg2}, pid1: {:mod1, :fun1, :arg1}]>

The key can be retrieved like so:

iex> HashDict.fetch(h, :pid2)
 {:ok, {:mod2, :fun2, :arg2}}

The resulting HashDict of pid and child specification mappings forms the state of the supervisor process, which we return in a {:ok, state} tuple, which is expected of init/1.

start_child(supervisor, child_spec)

I have not described what goes on in the private function start_children/1 that is used in init/1. Let’s skip ahead a little and look at start_child/2 first. This function takes in the supervisor pid and child specification and attaches the child to the supervisor:


Listing 2    thy_supervisor.ex – Starting a single child process

defmodule ThySupervisor do
   use GenServer
  
   #######
   # API #
   #######
  
   def start_child(supervisor, child_spec) do
     GenServer.call(supervisor, {:start_child, child_spec})
   end
  
  
  
   ######################
   # Callback Functions #
   ######################
  
   def handle_call({:start_child, child_spec}, _from, state) do
     case start_child(child_spec) do
       {:ok, pid} ->
         new_state = state |> HashDict.put(pid, child_spec)
         {:reply, {:ok, pid}, new_state}
       :error ->
         {:reply, {:error, “error starting child”}, state}
     end
   end
  
   #####################
   # Private Functions #
   #####################
  
   def start_child({mod, fun, args}) do
     case apply(mod, fun, args) do
       pid when is_pid(pid) ->
         {:ok, pid}
       _ ->
         :error
     end
   end
  
 end

The start_child/2 API call makes a synchronous call request to the supervisor. The request contains a tuple containing the :start_child atom and child specification. The request is handled by the handle_call({:start_child, child_spec}, _, _) callback. It attempts to start a new child process using the start_child/1 private function.

Upon success, the caller process receives {:ok, pid} and the state of the supervisor is updated to new_state. Otherwise, the caller process receives as tuple tagged with :error and is provided a reason.


Supervisor and Spawning Child Processes with spawn_link

Here is an important point, and we are making a large assumption here. The assumption is that we assume that the created process links to the supervisor process. What does this mean? This means that we assume that the process is spawned using spawn_link. In fact, in the OTP Supervisor behavior assumes that processes are created using spawn_link.

Starting child processes

Now, we can look at the start_children/1 function, which is used in init/1. Here it is:


Listing 3  thy_supervisor .ex – Starting children processes

defmodule ThySupervisor do
   # …
  
   #####################
   # Private Functions #
   #####################
  
     def start_children([child_spec|rest]) do
       case start_child(child_spec) do
         {:ok, pid} ->
           [{pid, child_spec}|start_children(rest)]
         :error ->
           :error
       end
     end
    
     def start_children([]), do: []
 end

The start_children/1 function takes a list of child specifications and hands start_child/1 a child specification, all the while accumulating a list of tuples. As previously seen, each tuple is a pair that contains the pid and the child specification.

How does start_child/1 do its work? Turns out, there isn’t a lot of sophisticated machinery involved.

def start_child({mod, fun, args}) do
   case apply(mod, fun, args) do
     pid when is_pid(pid) ->
       {:ok, pid}
     _ ->
       :error
   end
 end

terminate_child(supervisor, pid)

The supervisor needs a way to terminate its children. Here’s the API, callback and private function implementation:


Listing 4    thy_supervisor.ex – Terminating a single child process

defmodule ThySupervisor do
   use GenServer
  
   #######
   # API #
   #######
  
   def terminate_child(supervisor, pid) when is_pid(pid) do
     GenServer.call(supervisor, {:terminate_child, pid})
   end
  
   ######################
   # Callback Functions #
   ######################
  
   def handle_call({:terminate_child, pid}, _from, state) do
     case terminate_child(pid) do
       :ok ->
         new_state = state |> HashDict.delete(pid)
         {:reply, :ok, new_state}
       :error ->
         {:reply, {:error, “error terminating child”}, state}
     end
   end
  
   #####################
   # Private Functions #
   #####################
  
   def terminate_child(pid) do
     Process.exit(pid, :kill)
     :ok
   end
  
 end

We use Process.exit(pid, :kill) to terminate the child process. Remember how we set the supervisor to trap exits? When a child is forcibly killed using Process.exit(pid, :kill), the supervisor will receive a message in the form of {:EXIT, pid, :killed}. In order to handle this message, the handle_info/3 callback is used:


Listing 5  thy_supervisor.ex – :EXIT messages are handled via the handle_info/3 callback

def handle_info({:EXIT, from, :killed}, state) do
   new_state = state |> HashDict.delete(from)
   {:no reply, new_state}
 end

All we need to do is to update the supervisor state by remove its entry in the HashDict, and return the appropriate tuple in the callback.


restart_child(pid, child_spec)

Sometimes it is helpful to manually restart a child process. When we want to restart a child process, we need to supply the process id and the child specification. Why only the process id?   You might want to add in additional arguments, and that has to go into the child specification.

The restart_child/2 private function is a combination of terminate_child/1 and start_child/1.


Listing 6  thy_supervisor.ex – Restarting a child process

defmodule ThySupervisor do
   use GenServer
  
   #######
   # API #
   #######
  
   def restart_child(supervisor, pid, child_spec) when is_pid(pid) do
     GenServer.call(supervisor, {:restart_child, pid, child_spec})
   end
  
   ######################
   # Callback Functions #
   ######################
  
   def handle_call({:restart_child, old_pid}, _from, state) do
     case HashDict.fetch(state, old_pid) do
       {:ok, child_spec} ->
         case restart_child(old_pid, child_spec) do
           {:ok, {pid, child_spec}} ->
             new_state = state
                           |> HashDict.delete(old_pid)
                           |> HashDict.put(pid, child_spec)
             {:reply, {:ok, pid}, new_state}
           :error ->
             {:reply, {:error, “error restarting child”}, state}
         end
       _ ->
         {:reply, :ok, state}
     end
   end
  
   #####################
   # Private Functions #
   #####################
  
   def restart_child(pid, child_spec) when is_pid(pid) do
     case terminate_child(pid) do
       :ok ->
         case start_child(child_spec) do
           {:ok, new_pid} ->
             {:ok, {new_pid, child_spec}}
           :error ->
             :error
         end
       :error ->
         :error
     end
   end
  
 end

count_children(supervisor)

This function returns the number of children that is linked to the supervisor. The implementation is straightforward:


Listing 7    thy_supervisor.ex – Counting the number of child processes

defmodule ThySupervisor do
   use GenServer
  
   #######
   # API #
   #######
  
   def count_children(supervisor) do
     GenServer.call(supervisor, :count_children)
   end
  
   ######################
   # Callback Functions #
   ######################
  
   def handle_call(:count_children, _from, state) do
     {:reply, HashDict.size(state), state}
   end
  
 end

which_children(supervisor)

This is similar to count_children/1’s implementation. Because our implementation is simple, it is fine to return the entire state:


Listing 8    thy_supervisor.ex –  A simplistic implementation of which_childre/1 that returns the entire state of the supervisor

defmodule ThySupervisor do
   use GenServer
  
   #######
   # API #
   #######
  
   def which_children(supervisor) do
     GenServer.call(supervisor, :which_children)
   end
  
   ######################
   # Callback Functions #
   ######################
  
   def handle_call(:which_children, _from, state) do
     {:reply, state, state}
   end
  
 end

terminate(reason, state)

This callback is called to shutdown the supervisor process. Before we terminate the supervisor process, we need to terminate all the children it is linked to, which is handled by the terminate_children/1 private function:


Listing 9    thy_supervisor.ex – Terminating the supervisor involves terminating the child processes first

defmodule ThySupervisor do
   use GenServer
  
   ######################
   # Callback Functions #
   ######################
  
   def terminate(_reason, state) do
     terminate_children(state)
     :ok
   end
  
   #####################
   # Private Functions #
   #####################
  
   def terminate_children([]) do
     :ok
   end
  
   def terminate_children(child_specs) do
     child_specs |> Enum.each(fn {pid, _} -> terminate_child(pid) end)
   end
  
   def terminate_child(pid) do
     Process.exit(pid, :kill)
     :ok
   end
  
 end

Handling Crashes

I’ve saved the best for last. What happens when one of the child processes crashes? If you were paying attention, the supervisor would receive a message that looks like {:EXIT, pid, reason}. Once again, we use the handle_info/3 callback to handle the exit messages.

There are two cases to consider (other than :killed, which we handled in terminate_child/1).

The first case is when the process exited normally. The supervisor doesn’t have to do anything in this case, except update its state:


Listing 10    thy_supervisor.ex – Do nothing when a child process exits normally

def handle_info({:EXIT, from, :normal}, state) do
   new_state = state |> HashDict.delete(from)
   {:no reply, new_state}
 end

The second case is when the process has exited abnormally and hasn’t been forcibly killed. In that case, the supervisor should automatically restart the failed process:


Listing 11    thy_supervisor.ex – Restart a child process automatically if it exits for an abnormal reason

def handle_info({:EXIT, old_pid, _reason}, state) do
   case HashDict.fetch(state, old_pid) do
     {:ok, child_spec} ->
       case restart_child(old_pid, child_spec) do
         {:ok, {pid, child_spec}} ->
           new_state = state
                         |> HashDict.delete(old_pid)
                         |> HashDict.put(pid, child_spec)
           {:no reply, new_state}
         :error ->
           {:no reply, state}
       end
     _ ->
       {:no reply, state}
   end
 end

This above function is nothing new. It is almost the same implementation as restart_child/2, except that the child specification is reused.


Full Completed Source

Here is the full source of our hand-rolled supervisor in all its glory:


Listing 12    The full implementation of thy_supervisor.ex

defmodule ThySupervisor do
   use GenServer
  
   #######
   # API #
   #######
  
   def start_link(child_spec_list) do
     GenServer.start_link(__MODULE__, [child_spec_list])
   end
  
   def start_child(supervisor, child_spec) do
     GenServer.call(supervisor, {:start_child, child_spec})
   end
  
   def terminate_child(supervisor, pid) when is_pid(pid) do
     GenServer.call(supervisor, {:terminate_child, pid})
   end
  
   def restart_child(supervisor, pid, child_spec) when is_pid(pid) do
     GenServer.call(supervisor, {:restart_child, pid, child_spec})
   end
  
   def count_children(supervisor) do
     GenServer.call(supervisor, :count_children)
   end
  
   def which_children(supervisor) do
     GenServer.call(supervisor, :which_children)
   end
  
   ######################
   # Callback Functions #
   ######################
  
   def init([child_spec_list]) do
     Process.flag(:trap_exit, true)
     state = child_spec_list
               |> start_children               |> Enum.into(HashDict.new)
  
     {:ok, state}
   end
  
   def handle_call({:start_child, child_spec}, _from, state) do
     case start_child(child_spec) do
       {:ok, pid} ->
         new_state = state |> HashDict.put(pid, child_spec)
         {:reply, {:ok, pid}, new_state}
       :error ->
         {:reply, {:error, “error starting child”}, state}
     end
   end
  
   def handle_call({:terminate_child, pid}, _from, state) do
     case terminate_child(pid) do
       :ok ->
         new_state = state |> HashDict.delete(pid)
         {:reply, :ok, new_state}
       :error ->
         {:reply, {:error, “error terminating child”}, state}
     end
   end
  
   def handle_call({:restart_child, old_pid}, _from, state) do
     case HashDict.fetch(state, old_pid) do
       {:ok, child_spec} ->
         case restart_child(old_pid, child_spec) do
           {:ok, {pid, child_spec}} ->
             new_state = state
                           |> HashDict.delete(old_pid)
                           |> HashDict.put(pid, child_spec)
             {:reply, {:ok, pid}, new_state}
           :error ->
             {:reply, {:error, “error restarting child”}, state}
         end
       _ ->
         {:reply, :ok, state}
     end
   end
  
   def handle_call(:count_children, _from, state) do
     {:reply, HashDict.size(state), state}
   end
  
   def handle_call(:which_children, _from, state) do
     {:reply, state, state}
   end
  
   def handle_info({:EXIT, from, :normal}, state) do
     new_state = state |> HashDict.delete(from)
     {:no reply, new_state}
   end  
   def handle_info({:EXIT, from, :killed}, state) do
     new_state = state |> HashDict.delete(from)
     {:no reply, new_state}
   end
  
   def handle_info({:EXIT, old_pid, _reason}, state) do
     case HashDict.fetch(state, old_pid) do
       {:ok, child_spec} ->
         case restart_child(old_pid, child_spec) do
           {:ok, {pid, child_spec}} ->
             new_state = state
                           |> HashDict.delete(old_pid)
                           |> HashDict.put(pid, child_spec)
             {:no reply, new_state}
           :error ->
             {:no reply, state}
         end
       _ ->
         {:no reply, state}
     end
   end
  
   def terminate(_reason, state) do
     terminate_children(state)
     :ok
   end
  
   #####################
   # Private Functions #
   #####################
  
   def start_children([child_spec|rest]) do
     case start_child(child_spec) do
       {:ok, pid} ->
         [{pid, child_spec}|start_children(rest)]
       :error ->
         :error
     end
   end
  
   def start_children([]), do: []
  
   def start_child({mod, fun, args}) do
     case apply(mod, fun, args) do
       pid when is_pid(pid) ->
         {:ok, pid}
       _ ->
         :error
     end
   end
  
   def terminate_children([]) do
     :ok
   end
  
   def terminate_children(child_specs) do
     child_specs |> Enum.each(fn {pid, _} -> terminate_child(pid) end)
   end
  
   def terminate_child(pid) do
     Process.exit(pid, :kill)
     :ok
   end
  
   def restart_child(pid, child_spec) when is_pid(pid) do
     case terminate_child(pid) do
       :ok ->
         case start_child(child_spec) do
           {:ok, new_pid} ->
             {:ok, {new_pid, child_spec}}
           :error ->
             :error
         end
       :error ->
         :error
     end
   end
 end