![]()
|
From The Little Elixir & OTP Guidebook by Benjamin Tan Wei Hao In this article, excerpted from The Little Elixir & OTP Guidebook, I will guide the reader into implementing his/her own supervisor. |
Save 37% on The Little Elixir & OTP Guidebook. Just enter code fcctanweihao into the discount code box at checkout at manning.com.
A supervisor is a process whose only job is to monitor one or more processes. These processes can be worker processes or even other supervisors.
Figure 1: A supervision tree can be layered with other supervision trees. Both supervisors and workers can be supervised.
Supervisors and workers are arranged in a supervision tree. If any of the workers die, the supervisor can restart the dead worker, and potentially other workers in the supervision tree, based on certain restart strategies. What are worker processes? They are usually processes that have implemented the GenServer, GenFSM or GenEvent behaviors.
So far, you have all the building blocks needed to build your own Supervisor. Once you are done with this section, Supervisors will not seem magical anymore, although that does not make them any less awesome.
Supervisor API
The follow table lists the API of the supervisor along with a brief description:
Table 1 A summary of APIs that we will implement
API |
Description |
|
Given a list of child specifications (possibly empty), start the supervisor process and corresponding children |
|
Given a supervisor pid and a child specification, start the child process and link it to the supervisor. |
|
Given a supervisor pid and a child pid, terminate the child. |
|
Given a supervisor pid, child pid, and a child specification, restart the child process and initialize the child process with the child specification. |
|
Given the supervisor pid, return the number of child processes. |
|
Given the supervisor pid, return the state of the supervisor. |
Implementing the above API will give us a pretty good grasp of how the actual OTP Supervisor works under the hood.
Building Our Own Supervisor
As usual, we start with a new mix
project. Since calling it Supervisor
is unoriginal, and MySupervisor
is boring, let’s give it some Old English flair and call it ThySupervisor
instead:
% mix new thy_supervisor
As a form of revision, we are going to build our supervisor using the GenServer behavior. You might be surprised to know that the supervisor behavior does, in fact, implement the GenServer behavior.
defmodule ThySupervisor do use GenServer end
start_link(child_spec_list)
The first thing is to implement start_link/1
.
defmodule ThySupervisor do use GenServer def start_link(child_spec_list) do GenServer.start_link(__MODULE__, [child_spec_list]) end end
This is the main entry point to creating a supervisor process. Here, we call GenServer.start_link/2
with the name of the module and passing in a list with a single element ofchild_spec_list
. child_spec_list
specifies a list of (potentially empty) child specifications.
This is a fancy way of telling the supervisor what kinds of processes it should manage. A child specification for two (similar) workers could look like [{ThyWorker, :start_link, []}, {ThyWorker, :start_link, []}]
.
Recall that GenServer.start_link/2
expects the ThySupervisor.init/1
callback to be implemented. It passes the second argument (the list) into :init/1
. Let’s do that:
Listing 1 thy_supervisor.ex – start_link/1 and init callback/1. Notice that exits are being trapped in the init/1 callback.
defmodule ThySupervisor do
use GenServer
#######
# API #
#######
def start_link(child_spec_list) do
GenServer.start_link(__MODULE__, [child_spec_list])
end
######################
# Callback Functions #
######################
def init([child_spec_list]) do
Process.flag(:trap_exit, true) ❶
state = child_spec_list
|> start_children
|> Enum.into(HashDict.new)
{:ok, state}
end
end
❶ Make the supervisor process trap exits
The first thing we do here is to let the supervisor process trap exits. This is so that it can receive exit signals from its children as normal messages.
There is quite a bit going on in the lines that follow. The child_spec_list
is fed into start_children/1
. This function, as you will soon see, spawns the child processes and returns a list of tuples. Each tuple is a pair that contains the pid of the newly spawned child and the child specification. For example:
[{<0.82.0>, {ThyWorker, :init, []}}, {<0.84.0>, {ThyWorker, :init, []}}]
This list is then fed into Enum.into/2
. By passing in HashDict.new
as the second argument, we are effectively transforming the list of tuples into a HashDict
, with the pids of the child processes as the keys and the child specifications as the values.
transforming an enumerable to a collectable with enum.into
Enum.into/2
(and Enum.into/3
that takes an additional transformation function) takes an enumerable (like a List
) and inserts it into a Collectable
(like a HashDict
. This is very helpful because HashDict knows that if it gets a tuple, the first element becomes the key, and the second element becomes the value:
iex> h = [{:pid1, {:mod1, :fun1, :arg1}}, {:pid2, {:mod2, :fun2, :arg2}}] |> Enum.into(HashDict.new)
This returns a HashDict:
#HashDict<[pid2: {:mod2, :fun2, :arg2}, pid1: {:mod1, :fun1, :arg1}]>
The key can be retrieved like so:
iex> HashDict.fetch(h, :pid2) {:ok, {:mod2, :fun2, :arg2}}
The resulting HashDict
of pid and child specification mappings forms the state of the supervisor process, which we return in a {:ok, state}
tuple, which is expected of init/1
.
start_child(supervisor, child_spec)
I have not described what goes on in the private function start_children/1
that is used in init/1
. Let’s skip ahead a little and look at start_child/2
first. This function takes in the supervisor pid and child specification and attaches the child to the supervisor:
Listing 2 thy_supervisor.ex – Starting a single child process
defmodule ThySupervisor do use GenServer ####### # API # ####### def start_child(supervisor, child_spec) do GenServer.call(supervisor, {:start_child, child_spec}) end ###################### # Callback Functions # ###################### def handle_call({:start_child, child_spec}, _from, state) do case start_child(child_spec) do {:ok, pid} -> new_state = state |> HashDict.put(pid, child_spec) {:reply, {:ok, pid}, new_state} :error -> {:reply, {:error, “error starting child”}, state} end end ##################### # Private Functions # ##################### def start_child({mod, fun, args}) do case apply(mod, fun, args) do pid when is_pid(pid) -> {:ok, pid} _ -> :error end end end
The start_child/2
API call makes a synchronous call request to the supervisor. The request contains a tuple containing the :start_child
atom and child specification. The request is handled by the handle_call({:start_child, child_spec}, _, _)
callback. It attempts to start a new child process using the start_child/1
private function.
Upon success, the caller process receives {:ok, pid}
and the state of the supervisor is updated to new_state
. Otherwise, the caller process receives as tuple tagged with :error
and is provided a reason.
Supervisor and Spawning Child Processes with spawn_link
Here is an important point, and we are making a large assumption here. The assumption is that we assume that the created process links to the supervisor process. What does this mean? This means that we assume that the process is spawned using spawn_link
. In fact, in the OTP Supervisor behavior assumes that processes are created using spawn_link
.
Starting child processes
Now, we can look at the start_children/1
function, which is used in init/1
. Here it is:
Listing 3 thy_supervisor .ex – Starting children processes
defmodule ThySupervisor do # … ##################### # Private Functions # ##################### def start_children([child_spec|rest]) do case start_child(child_spec) do {:ok, pid} -> [{pid, child_spec}|start_children(rest)] :error -> :error end end def start_children([]), do: [] end
The start_children/1
function takes a list of child specifications and hands start_child/1
a child specification, all the while accumulating a list of tuples. As previously seen, each tuple is a pair that contains the pid
and the child specification.
How does start_child/1
do its work? Turns out, there isn’t a lot of sophisticated machinery involved.
def start_child({mod, fun, args}) do case apply(mod, fun, args) do pid when is_pid(pid) -> {:ok, pid} _ -> :error end end
terminate_child(supervisor, pid)
The supervisor needs a way to terminate its children. Here’s the API, callback and private function implementation:
Listing 4 thy_supervisor.ex – Terminating a single child process
defmodule ThySupervisor do use GenServer ####### # API # ####### def terminate_child(supervisor, pid) when is_pid(pid) do GenServer.call(supervisor, {:terminate_child, pid}) end ###################### # Callback Functions # ###################### def handle_call({:terminate_child, pid}, _from, state) do case terminate_child(pid) do :ok -> new_state = state |> HashDict.delete(pid) {:reply, :ok, new_state} :error -> {:reply, {:error, “error terminating child”}, state} end end ##################### # Private Functions # ##################### def terminate_child(pid) do Process.exit(pid, :kill) :ok end end
We use Process.exit(pid, :kill)
to terminate the child process. Remember how we set the supervisor to trap exits? When a child is forcibly killed using Process.exit(pid, :kill)
, the supervisor will receive a message in the form of {:EXIT, pid, :killed}
. In order to handle this message, the handle_info/3
callback is used:
Listing 5 thy_supervisor.ex – :EXIT messages are handled via the handle_info/3 callback
def handle_info({:EXIT, from, :killed}, state) do new_state = state |> HashDict.delete(from) {:no reply, new_state} end
All we need to do is to update the supervisor state by remove its entry in the HashDict
, and return the appropriate tuple in the callback.
restart_child(pid, child_spec)
Sometimes it is helpful to manually restart a child process. When we want to restart a child process, we need to supply the process id and the child specification. Why only the process id? You might want to add in additional arguments, and that has to go into the child specification.
The restart_child/2
private function is a combination of terminate_child/1
and start_child/1
.
Listing 6 thy_supervisor.ex – Restarting a child process
defmodule ThySupervisor do use GenServer ####### # API # ####### def restart_child(supervisor, pid, child_spec) when is_pid(pid) do GenServer.call(supervisor, {:restart_child, pid, child_spec}) end ###################### # Callback Functions # ###################### def handle_call({:restart_child, old_pid}, _from, state) do case HashDict.fetch(state, old_pid) do {:ok, child_spec} -> case restart_child(old_pid, child_spec) do {:ok, {pid, child_spec}} -> new_state = state |> HashDict.delete(old_pid) |> HashDict.put(pid, child_spec) {:reply, {:ok, pid}, new_state} :error -> {:reply, {:error, “error restarting child”}, state} end _ -> {:reply, :ok, state} end end ##################### # Private Functions # ##################### def restart_child(pid, child_spec) when is_pid(pid) do case terminate_child(pid) do :ok -> case start_child(child_spec) do {:ok, new_pid} -> {:ok, {new_pid, child_spec}} :error -> :error end :error -> :error end end end
count_children(supervisor)
This function returns the number of children that is linked to the supervisor. The implementation is straightforward:
Listing 7 thy_supervisor.ex – Counting the number of child processes
defmodule ThySupervisor do use GenServer ####### # API # ####### def count_children(supervisor) do GenServer.call(supervisor, :count_children) end ###################### # Callback Functions # ###################### def handle_call(:count_children, _from, state) do {:reply, HashDict.size(state), state} end end
which_children(supervisor)
This is similar to count_children/1
’s implementation. Because our implementation is simple, it is fine to return the entire state:
Listing 8 thy_supervisor.ex – A simplistic implementation of which_childre/1 that returns the entire state of the supervisor
defmodule ThySupervisor do use GenServer ####### # API # ####### def which_children(supervisor) do GenServer.call(supervisor, :which_children) end ###################### # Callback Functions # ###################### def handle_call(:which_children, _from, state) do {:reply, state, state} end end
terminate(reason, state)
This callback is called to shutdown the supervisor process. Before we terminate the supervisor process, we need to terminate all the children it is linked to, which is handled by theterminate_children/1
private function:
Listing 9 thy_supervisor.ex – Terminating the supervisor involves terminating the child processes first
defmodule ThySupervisor do use GenServer ###################### # Callback Functions # ###################### def terminate(_reason, state) do terminate_children(state) :ok end ##################### # Private Functions # ##################### def terminate_children([]) do :ok end def terminate_children(child_specs) do child_specs |> Enum.each(fn {pid, _} -> terminate_child(pid) end) end def terminate_child(pid) do Process.exit(pid, :kill) :ok end end
Handling Crashes
I’ve saved the best for last. What happens when one of the child processes crashes? If you were paying attention, the supervisor would receive a message that looks like {:EXIT, pid, reason}
. Once again, we use the handle_info/3
callback to handle the exit messages.
There are two cases to consider (other than :killed
, which we handled in terminate_child/1
).
The first case is when the process exited normally. The supervisor doesn’t have to do anything in this case, except update its state:
Listing 10 thy_supervisor.ex – Do nothing when a child process exits normally
def handle_info({:EXIT, from, :normal}, state) do new_state = state |> HashDict.delete(from) {:no reply, new_state} end
The second case is when the process has exited abnormally and hasn’t been forcibly killed. In that case, the supervisor should automatically restart the failed process:
Listing 11 thy_supervisor.ex – Restart a child process automatically if it exits for an abnormal reason
def handle_info({:EXIT, old_pid, _reason}, state) do case HashDict.fetch(state, old_pid) do {:ok, child_spec} -> case restart_child(old_pid, child_spec) do {:ok, {pid, child_spec}} -> new_state = state |> HashDict.delete(old_pid) |> HashDict.put(pid, child_spec) {:no reply, new_state} :error -> {:no reply, state} end _ -> {:no reply, state} end end
This above function is nothing new. It is almost the same implementation as restart_child/2
, except that the child specification is reused.
If you want to learn more about the book, check it out on liveBook here and also see this slide deck.
Full Completed Source
Here is the full source of our hand-rolled supervisor in all its glory:
Listing 12 The full implementation of thy_supervisor.ex
defmodule ThySupervisor do use GenServer ####### # API # ####### def start_link(child_spec_list) do GenServer.start_link(__MODULE__, [child_spec_list]) end def start_child(supervisor, child_spec) do GenServer.call(supervisor, {:start_child, child_spec}) end def terminate_child(supervisor, pid) when is_pid(pid) do GenServer.call(supervisor, {:terminate_child, pid}) end def restart_child(supervisor, pid, child_spec) when is_pid(pid) do GenServer.call(supervisor, {:restart_child, pid, child_spec}) end def count_children(supervisor) do GenServer.call(supervisor, :count_children) end def which_children(supervisor) do GenServer.call(supervisor, :which_children) end ###################### # Callback Functions # ###################### def init([child_spec_list]) do Process.flag(:trap_exit, true) state = child_spec_list |> start_children |> Enum.into(HashDict.new) {:ok, state} end def handle_call({:start_child, child_spec}, _from, state) do case start_child(child_spec) do {:ok, pid} -> new_state = state |> HashDict.put(pid, child_spec) {:reply, {:ok, pid}, new_state} :error -> {:reply, {:error, “error starting child”}, state} end end def handle_call({:terminate_child, pid}, _from, state) do case terminate_child(pid) do :ok -> new_state = state |> HashDict.delete(pid) {:reply, :ok, new_state} :error -> {:reply, {:error, “error terminating child”}, state} end end def handle_call({:restart_child, old_pid}, _from, state) do case HashDict.fetch(state, old_pid) do {:ok, child_spec} -> case restart_child(old_pid, child_spec) do {:ok, {pid, child_spec}} -> new_state = state |> HashDict.delete(old_pid) |> HashDict.put(pid, child_spec) {:reply, {:ok, pid}, new_state} :error -> {:reply, {:error, “error restarting child”}, state} end _ -> {:reply, :ok, state} end end def handle_call(:count_children, _from, state) do {:reply, HashDict.size(state), state} end def handle_call(:which_children, _from, state) do {:reply, state, state} end def handle_info({:EXIT, from, :normal}, state) do new_state = state |> HashDict.delete(from) {:no reply, new_state} end def handle_info({:EXIT, from, :killed}, state) do new_state = state |> HashDict.delete(from) {:no reply, new_state} end def handle_info({:EXIT, old_pid, _reason}, state) do case HashDict.fetch(state, old_pid) do {:ok, child_spec} -> case restart_child(old_pid, child_spec) do {:ok, {pid, child_spec}} -> new_state = state |> HashDict.delete(old_pid) |> HashDict.put(pid, child_spec) {:no reply, new_state} :error -> {:no reply, state} end _ -> {:no reply, state} end end def terminate(_reason, state) do terminate_children(state) :ok end ##################### # Private Functions # ##################### def start_children([child_spec|rest]) do case start_child(child_spec) do {:ok, pid} -> [{pid, child_spec}|start_children(rest)] :error -> :error end end def start_children([]), do: [] def start_child({mod, fun, args}) do case apply(mod, fun, args) do pid when is_pid(pid) -> {:ok, pid} _ -> :error end end def terminate_children([]) do :ok end def terminate_children(child_specs) do child_specs |> Enum.each(fn {pid, _} -> terminate_child(pid) end) end def terminate_child(pid) do Process.exit(pid, :kill) :ok end def restart_child(pid, child_spec) when is_pid(pid) do case terminate_child(pid) do :ok -> case start_child(child_spec) do {:ok, new_pid} -> {:ok, {new_pid, child_spec}} :error -> :error end :error -> :error end end end