神刀安全网

Process registry in Elixir: A practical example

Process registry in Elixir: A practical example
I’m sure I left that process somewhere around here.

Processes in Elixir (and Erlang , for that matter) are identified with a unique process id, the pid .

That’s what we use to interact with them. We send a message to a pid and the VM takes care of delivering it to the correct process. Sometimes, though, relying on the pid of a process can be problematic, or not even possible.

We may be holding the pid of a process that is already dead, or we may be using a Supervisor that abstracts the process creation so we don’t even know its pid.

Let’s create a simple application to see what issues we can have and what are some ways to solve them.

Starting with no registry at all

For this example we will create a simple chat application. Let’s go ahead and create a new mix project for it:

$ mix new chat

And we can create a pretty standard GenServer that will be used throughout these examples:

# in lib/chat/server.ex
defmodule Chat.Server do
use GenServer
# API
def start_link do
GenServer.start_link(__MODULE__, [])
end
def add_message(pid, message) do
GenServer.cast(pid, {:add_message, message})
end
def get_messages(pid) do
GenServer.call(pid, :get_messages)
end
# SERVER
def init(messages) do
{:ok, messages}
end
def handle_cast({:add_message, new_message}, messages) do
{:noreply, [new_message | messages]}
end
def handle_call(:get_messages, _from, messages) do
{:reply, messages, messages}
end
end

If this still does not look familiar to you, Elixir’s getting started guides has a great introduction to OTP that is worth checking.

And we can now start an iex session to test this server:

$ iex -S mix
iex> {:ok, pid} = Chat.Server.start_link
{:ok, #PID<0.107.0>}
iex> Chat.Server.add_message(pid, "foo")
:ok
iex> Chat.Server.add_message(pid, "bar")
:ok
iex> Chat.Server.get_messages(pid)
["bar", "foo"]

Take a look at the entire code here .

So far so good. We get a pid when we start this process, and then for every message we want to send ( add_message/2 and get_messages/1 ) we pass this pid and everything works as expected.

Things start to get more interesting when we introduce a Supervisor.

Introducing a Supervisor

If for some reason our Chat.Server process dies, we are left there, sad and alone in our iex session, without any choice other than manually starting a new process and sending messages to this new pid . Let’s create a Supervisor so we don’t need to worry about that.

# in lib/chat/supervisor.ex
defmodule Chat.Supervisor do
use Supervisor
def start_link do
Supervisor.start_link(__MODULE__, [])
end
def init(_) do
children = [
worker(Chat.Server, [])
]
supervise(children, strategy: :one_for_one)
end
end

Creating a Supervisor is simple enough, but now we have a problem if we try to follow the same approach we did before. We are not starting the Chat.Server process ourselves, the Supervisor is taking care of that and we just don’t have access to the pid of the processes that the Supervisor creates.

This is a property of the Supervisor pattern. You can’t have access to its children’s pid as it will, when necessary, restart these processes (which actually means it will kill and start a new process, with a different pid ).

Registering a process name

To access our Chat.Server process we need some way to reference it using something other than the pid , a reference that will be the same even if the process is restarted by the Supervisor. We need to give it a name.

So let’s change Chat.Server :

# lib/chat/server.ex
defmodule Chat.Server do
use GenServer
def start_link do
# We now start the GenServer with a `name` option.
GenServer.start_link(__MODULE__, [], name: :chat_room)
end
# And our function doesn't need to receive the pid anymore,
# as we can reference the process with its unique name.
def add_message(message) do
GenServer.cast(:chat_room, {:add_message, message})
end
def get_messages do
GenServer.call(:chat_room, :get_messages)
end
# ...
end

Check out the diff here .

And it should continue working the same way, except we don’t need to pass the pid around anymore:

$ iex -S mix
iex> Chat.Supervisor.start_link
{:ok, #PID<0.94.0>}
iex> Chat.Server.add_message("foo")
:ok
iex> Chat.Server.add_message("bar")
:ok
iex> Chat.Server.get_messages
["bar", "foo"]

And if the process is restarted we should be able to access it in the same way:

iex> Process.whereis(:chat_room)
#PID<0.111.0>
iex> Process.whereis(:chat_room) |> Process.exit(:kill)
true
iex> Process.whereis(:chat_room)
#PID<0.114.0>
iex> Chat.Server.add_message "foo"
:ok
iex> Chat.Server.get_messages
["foo"]

That will do the job for our current scenario, but let’s try to make things a bit more complex (and real).

Dynamic process creation

Imagine we want to support multiple chat rooms. A client will start a new room with a name, and should be able to send messages to any room she wants. The interface would be something like this:

iex> Chat.Supervisor.start_room("first room")
iex> Chat.Supervisor.start_room("second room")
iex> Chat.Server.add_message("first room", "foo")
iex> Chat.Server.add_message("second room", "bar")
iex> Chat.Server.get_messages("first room")
["foo"]
iex> Chat.Server.get_messages("second room")
["bar"]

Let’s start by changing the Supervisor to support that:

# lib/chat/supervisor.ex
defmodule Chat.Supervisor do
use Supervisor
def start_link do
# We are now registering our supervisor process with a name
# so we can reference it in the `start_room/1` function
Supervisor.start_link(__MODULE__, [], name: :chat_supervisor)
end
def start_room(name) do
# And we use `start_child/2` to start a new Chat.Server process
Supervisor.start_child(:chat_supervisor, [name])
end
def init(_) do
children = [
worker(Chat.Server, [])
]
# We also changed the `strategty` to `simple_one_for_one`.
# With this strategy, we define just a "template" for a child,
# no process is started during the Supervisor initialization,
# just when we call `start_child/2`
supervise(children, strategy: :simple_one_for_one)
end
end

And let’s make Chat.Server accept a name in the start_link function:

# lib/chat/server.ex
defmodule Chat.Server do
use GenServer
# Just accept a `name` parameter here for now
def start_link(name) do
GenServer.start_link(__MODULE__, [], name: :chat_room)
end
#...
end

Check out the entire diff .

The problem now is that, as we can have a bunch of Chat.Server processes, we can’t call all of them  :chat_room.

$ iex -S mix
iex> Chat.Supervisor.start_link
{:ok, #PID<0.107.0>}
iex> Chat.Supervisor.start_room "foo"
{:ok, #PID<0.109.0>}
iex> Chat.Supervisor.start_room "bar"
{:error, {:already_started, #PID<0.109.0>}}

Fair enough. When we try to start the second process it fails because a process named :chat_room is already started. We need to register these processes in another way.

The name option is quite restrictive to what it will accept, though, we can’t have a name like {:chat_room, "room name"} . To quote the documentation :

The supported values are:
an atom — the GenServer is registered locally with the given name using Process.register/2.
{:global, term} — the GenServer is registered globally with the given term using the functions in the :global module.
{:via, module, term} — the GenServer is registered with the given mechanism and name.

The first option, an atom, is what we have been using so far and we know it’s now enough for our needs now.

The second option is used to register a process globally, across multiple nodes, and it relies on a local ETS table. This also means it requires synchronization across the entire cluster, which introduces some unnecessary overhead unless you actually need this behavior.

The third and last option is using what is called a via tuple , and that’s exactly what we need to solve our problem. That’s what the documentation says about it:

The :via option expects a module that exports register_name/2, unregister_name/1, whereis_name/1 and send/2.

It’s hard to understand what this means without an example, so let’s see this in action.

Using via tuple

via tuple is basically a way to tell Elixir that we are going to use a custom module to register our processes. It expects this module to know how to do a few things:

  • How to register a name, that can be any Elixir term, using the function register_name/2 ;
  • How to unregister a name, using the function unregister_name/1 ;
  • How to find the pid of a process with a given name, using whereis_name/1 ;
  • And, finally, how to send a message to a given process, with send/2 .

For this to work, these functions are expected to return a response in a specific format, the same way OTP expects our handle_call/3 , handle_cast/2 and friends to follow some rules.

So let’s implement a module that knows how to do that:

# in lib/chat/registry.ex
defmodule Chat.Registry do
use GenServer
# API
def start_link do
# We register our registry (yeah, I know) with a simple name,
# just so we can reference it in the other functions.
GenServer.start_link(__MODULE__, nil, name: :registry)
end
def whereis_name(room_name) do
GenServer.call(:registry, {:whereis_name, room_name})
end
def register_name(room_name, pid) do
GenServer.call(:registry, {:register_name, room_name, pid})
end
def unregister_name(room_name) do
GenServer.cast(:registry, {:unregister_name, room_name})
end
def send(room_name, message) do
# If we try to send a message to a process
# that is not registered, we return a tuple in the format
# {:badarg, {process_name, error_message}}.
# Otherwise, we just forward the message to the pid of this
# room.

case whereis_name(room_name) do
:undefined ->
{:badarg, {room_name, message}}
pid ->
Kernel.send(pid, message)
pid
end
end
# SERVER
def init(_) do
# We will use a simple Map to store our processes in
# the format %{"room name" => pid}
{:ok, Map.new}
end
def handle_call({:whereis_name, room_name}, _from, state) do
{:reply, Map.get(state, room_name, :undefined), state}
end
def handle_call({:register_name, room_name, pid}, _from, state) do
# Registering a name is just a matter of putting it in our Map.
# Our response tuple include a `:no` or `:yes` indicating if
# the process was included or if it was already present.
case Map.get(state, room_name) do
nil ->
{:reply, :yes, Map.put(state, room_name, pid)}
_ ->
{:reply, :no, state}
end
end
def handle_cast({:unregister_name, room_name}, state) do
# And unregistering is as simple as deleting an entry
# from our Map

{:noreply, Map.delete(state, room_name)}
end
end

Again, it’s up to us to decide how this registry is going to work. Here we are using a simple Map to relate the room name with its pid .

The implementation is straightforward if you are familiar with how a GenServer works (except for the not so usual return values).

Let’s try that on iex :

$ iex -S mix
iex> {:ok, pid} = Chat.Server.start_link("room1")
{:ok, #PID<0.107.0>}
iex> Chat.Registry.start_link
{:ok, #PID<0.109.0>}
iex> Chat.Registry.whereis_name("room1")
:undefined
iex> Chat.Registry.register_name("room1", pid)
:yes
iex> Chat.Registry.register_name("room1", pid)
:no
iex> Chat.Registry.whereis_name("room1")
#PID<0.107.0>
iex> Chat.Registry.unregister_name("room1")
:ok
iex> Chat.Registry.whereis_name("room1")
:undefined

The registry is working fine. We can register, unregister and find processes using their names, so let’s start using it.

Our original problem was that we now can have multiple Chat.Server processes that are initialized by a Supervisor. In order to send a message to a specific room, we want to call Chat.Server.add_message(“room1”, “my message”) , so we need to register our rooms with names like {:chat_room, “room1”} and {:chat_room, “room2”} . Here’s how our via tuple implementation makes it possible:

# in lib/chat/server.ex
defmodule Chat.Server do
use GenServer
# API
def start_link(name) do
# Instead of passing an atom to the `name` option, we send
# a tuple. Here we extract this tuple to a private method
# called `via_tuple` that can be reused in every function
GenServer.start_link(__MODULE__, [], name: via_tuple(name))
end
def add_message(room_name, message) do
# And the `GenServer` callbacks will accept this tuple the
# same way it accepts a pid or an atom.

GenServer.cast(via_tuple(room_name), {:add_message, message})
end
def get_messages(room_name) do
GenServer.call(via_tuple(room_name), :get_messages)
end
defp via_tuple(room_name) do
# And the tuple always follow the same format:
# {:via, module_name, term}
{:via, Chat.Registry, {:chat_room, room_name}}
end
# SERVER (no changes required here)
# ...
end

Check out the diff .

What happens here is that every time we send a message to Chat.Server passing a room name, it will find the pid of the process we want via the module we are providing (in this case, Chat.Registry ).

And this solves our problem, we can have as many Chat.Server processes as we want and we never need to know their pids .

There is still a big problem with this solution. Our registry never knows about processes that crashed and had to be restarted by the Supervisor, and that means that when this happens the registry will hold a pid that is not valid anymore.

Solving this issue should not be too hard, though. We will make our registry monitor all the processes it is taking care of, and when one of them crashes, we can safely remove it from our Map .

# in lib/chat/registry.ex
defmodule Chat.Registry do
# ...
def handle_call({:register_name, room_name, pid}, _from, state) do
case Map.get(state, room_name) do
nil ->
# When a new process is registered, we start monitoring it.
Process.monitor(pid)
{:reply, :yes, Map.put(state, room_name, pid)}
_ ->
{:reply, :no, state}
end
end
def handle_info({:DOWN, _, :process, pid, _}, state) do
# When a monitored process dies, we will receive a
# `:DOWN` message that we can use to remove the
# dead pid from our registry.

{:noreply, remove_pid(state, pid)}
end
def remove_pid(state, pid_to_remove) do
# And here we just filter out the dead pid
remove = fn {_key, pid} -> pid != pid_to_remove end
Enum.filter(state, remove) |> Enum.into(%{})
end
end

Check out the diff .

And let’s make sure it works:

$ iex -S mix
iex> Chat.Registry.start_link
{:ok, #PID<0.107.0>}
iex> Chat.Supervisor.start_link
{:ok, #PID<0.109.0>}
iex> Chat.Supervisor.start_room("room1")
{:ok, #PID<0.111.0>}
iex> Chat.Server.add_message("room1", "message")
:ok
iex> Chat.Server.get_messages("room1")
["message"]
iex> Chat.Registry.whereis_name({:chat_room, "room1"}) |> Process.exit(:kill)
true
iex> Chat.Server.add_message("room1", "message")
:ok
iex> Chat.Server.get_messages("room1")
["message"]

And now it doesn’t matter how many times the Supervisor restarts a Chat.Server process, when we send a message to a room it will always find the correct pid .

Simplifying with gproc

This is as far as we will go with our example, but I just want to show a tool that helps us to simplify our via tuple registry. That is gproc , an Erlang library.

Instead of telling Elixir to find the Chat.Server process via our Chat.Registry module, we will tell it to find this process via gproc , and then we should be able to get rid of Chat.Registry entirely.

Let’s start by adding this dependency on mix.exs :

# in mix.exs
defmodule Chat.Mixfile do
# ...
def application do
[applications: [:logger, :gproc]]
end
defp deps do
[{:gproc, "0.3.1"}]
end
end

And then running mix deps.get to fetch the new gproc dependency.

With that in place, we should be able to change our via tuple definition to make it use gproc instead of Chat.Registry :

# in lib/chat/server.ex
defmodule Chat.Server do
# ...
# The only thing we need to change is the `via_tuple/1` function,
# to make it use `gproc` instead of `Chat.Registry`
defp via_tuple(room_name) do
{:via, :gproc, {:n, :l, {:chat_room, room_name}}}
end
# ...
end

gproc requires the keys to be tuples with three values, in the form {type, scope, key}.

Here we are using :n (for name , meaning that we can’t have more than one process registered under a given key) as the type and  :l (for local , meaning that the process is not registered across the entire cluster of nodes) as the scope. The key can be any Elixir term we want (e.g. {:chat_room, “room1”} ). I won’t get into the details of all the possible gproc values, but you can check that in the documentation .

With this change, we can now remove the Chat.Registry module completely, and check that things are still working in the same way:

$ iex -S mix
iex> Chat.Supervisor.start_link
{:ok, #PID<0.190.0>}
iex> Chat.Supervisor.start_room("room1")
{:ok, #PID<0.192.0>}
iex> Chat.Supervisor.start_room("room2")
{:ok, #PID<0.194.0>}
iex> Chat.Server.add_message("room1", "first message")
:ok
iex> Chat.Server.add_message("room2", "second message")
:ok
iex> Chat.Server.get_messages("room1")
["first message"]
iex> Chat.Server.get_messages("room2")
["second message"]
iex> :gproc.where({:n, :l, {:chat_room, "room1"}}) |> Process.exit(:kill)
true
iex> Chat.Server.add_message("room1", "first message")
:ok
iex> Chat.Server.get_messages("room1")
["first message"]

Check out all the changes necessary to start using gproc .

Some takeaways and where to go from here

We covered a lot of ground. The main takeaways are:

  • Be careful when dealing with pids directly, as they will change when a process is restarted and you may be holding a dead one;
  • When you need to reference a single process (like when we had just one chat room), registering this process with an atom name is usually enough;
  • When you need to create processes dynamically (e.g. for multiple chat rooms), you can use via tuple to provide a custom registry ;
  • There are tools out there (like gproc, that we used in our example) that will help you with that so you don’t need to roll your own registry module.

That’s not all, though. If you need global registration across all the nodes in a cluster, some other things should be considered as well. Erlang has a global module for global process registration, pg2 for process groups, and even gproc can help with that.

If this post piqued your interest, you should definitely check out Elixir in Action , by Saša Jurić .

In this respository you can find all the code we wrote in this example.

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » Process registry in Elixir: A practical example

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
分享按钮