Riak Core on Partisan on Elixir Tutorial: Getting Started
Previous posts: Riak Core on Partisan on Elixir Tutorial: Introduction and Riak Core on Partisan on Elixir Tutorial: Setup.
Creating the project
The project at this stage is available in the tag start-1
Add uniendo dependency
uniendo is a library that contains all required dependencies and overrides in a single place to make it easy to use from other projects, add it to the deps sections in mix.exs like this:
defp deps do [ {:uniendo, git: "git://github.com/marianoguerra/uniendo.git", branch: "master"} ] end
The project at this stage is available in the tag start-2
Add application configuration
We need to add riak_core to the list of applications to start and start our own supervisor tree, modify the application section in mix.exs like this:
The project at this stage is available in the tag start-3
First build
Let's try building the project as is, mainly to test that the dependencies are compiled correctly.
First we need to get the dependencies:
If it's the first time it may ask you:
Could not find Hex, which is needed to build dependency :folsom Shall I install Hex? (if running non-interactively, use "mix local.hex --force") [Yn]
Just hit enter to continue.
When it finishes, try to build the project:
The project at this stage is available in the tag start-4
If it's the first time you build a project with dependencies written in erlang it may ask you:
Shall I install rebar3? (if running non-interactively, use "mix local.rebar --force") [Yn]
Just hit enter to continue.
If you get an error compiling you may want to try updating the rebar3 version mix uses with:
If setup fails to build, try commenting the line on deps/setup/rebar.config:
Like this:
Making it start
We need to start our application and riak_core, for that we need some initial setup:
The project at this stage is available in the tag start-5
Add the following content to the file in config/config.exs:
use Mix.Config config :riak_core, ring_state_dir: 'ring_data_dir', handoff_port: 8099, handoff_ip: '127.0.0.1', schema_dirs: ['priv'] config :sasl, errlog_type: :error
The project at this stage is available in the tag start-6
Edit lib/civile.ex and change its content to:
defmodule Civile do use Application require Logger def start(_type, _args) do case Civile.Supervisor.start_link do {:ok, pid} -> {:ok, pid} {:error, reason} -> Logger.error("Unable to start Civile supervisor because: #{inspect reason}") end end end
Create a new file called lib/civile_supervisor.ex with the following content:
defmodule Civile.Supervisor do use Supervisor def start_link do # riak_core appends _sup to the application name. Supervisor.start_link(__MODULE__, [], [name: :civile_sup]) end def init(_args) do children = [] supervise(children, strategy: :one_for_one, max_restarts: 5, max_seconds: 10) end end
The project at this stage is available in the tag start-7
Now recompile the project:
And start it:
You should see something like this:
Erlang/OTP 22 [erts-10.5] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1] [hipe] 00:48:07.530 [info] Starting reporters with [] 00:48:07.577 [info] Using node name: 'dev@127.0.0.1' 00:48:07.581 [info] Resolving "127.0.0.1"... 00:48:07.583 [info] Resolved "dev@127.0.0.1" to {127,0,0,1} 00:48:07.583 [info] Resolved "127.0.0.1" to {127,0,0,1} 00:48:07.679 [info] Partisan listening on {127,0,0,1}:49489 listen_addrs: [#{ip => {127,0,0,1},port => 49489}] 00:48:07.686 [info] Not using container orchestration; disabling. 00:48:07.694 [info] node 'dev@127.0.0.1' choosing random seed: {52969312,-576460751045187665,-576460752303423453} 00:48:07.728 [info] node 'dev@127.0.0.1' choosing random seed: {52969312,-576460751045187665,-576460752303423453} 00:48:07.774 [info] Configuring partisan dispatch: false dets: file "data/cluster_meta/manifest.dets" not properly closed, repairing ... 00:48:07.938 [info] New capability: {riak_core,vnode_routing} = proxy 00:48:07.942 [info] New capability: {riak_core,staged_joins} = true 00:48:07.947 [info] New capability: {riak_core,resizable_ring} = true 00:48:07.951 [info] New capability: {riak_core,fold_req_version} = v2 00:48:07.956 [info] New capability: {riak_core,security} = true 00:48:07.960 [info] New capability: {riak_core,bucket_types} = true 00:48:07.964 [info] New capability: {riak_core,net_ticktime} = true Interactive Elixir (1.9.1) - press Ctrl+C to exit (type h() ENTER for help) iex(dev@127.0.0.1)1>
You may get a lager crash log at startup, ignore it.
Hit Ctrl+C twice to quit.
Implementing our VNode
Write the following on a new file called lib/civile_vnode.ex:
defmodule Civile.VNode do @behaviour :riak_core_vnode def start_vnode(partition) do :riak_core_vnode_master.get_vnode_pid(partition, __MODULE__) end def init([partition]) do {:ok, %{partition: partition}} end def handle_command({:ping, v}, _sender, state = %{partition: partition}) do {:reply, {:pong, v + 1, node(), partition}, state} end def handoff_starting(_dest, state) do {true, state} end def handoff_cancelled(state) do {:ok, state} end def handoff_finished(_dest, state) do {:ok, state} end def handle_handoff_command(_fold_req, _sender, state) do {:noreply, state} end def is_empty(state) do {true, state} end def terminate(_reason, _state) do :ok end def delete(state) do {:ok, state} end def handle_handoff_data(_bin_data, state) do {:reply, :ok, state} end def encode_handoff_item(_k, _v) do end def handle_coverage(_req, _key_spaces, _sender, state) do {:stop, :not_implemented, state} end def handle_exit(_pid, _reason, state) do {:noreply, state} end def handle_overload_command(_, _, _) do :ok end def handle_overload_info(_, _idx) do :ok end end
Write the following on a new file called lib/civile_service.ex:
defmodule Civile.Service do def ping(v\\1) do idx = :riak_core_util.chash_key({"civile", "ping#{v}"}) pref_list = :riak_core_apl.get_primary_apl(idx, 1, Civile.Service) [{index_node, _type}] = pref_list :riak_core_vnode_master.sync_command(index_node, {:ping, v}, Civile.VNode_master) end end
In lib/civile_supervisor.ex, add the vnode master as a child to our supervisor
Change:
def init(_args) do children = [] supervise(children, strategy: :one_for_one, max_restarts: 5, max_seconds: 10) end
To:
def init(_args) do children = [ worker(:riak_core_vnode_master, [Civile.VNode], id: Civile.VNode_master_worker) ] supervise(children, strategy: :one_for_one, max_restarts: 5, max_seconds: 10) end
And register our vnode implementation on riak_core.
In lib/civile.ex change:
def start(_type, _args) do case Civile.Supervisor.start_link do {:ok, pid} -> {:ok, pid} {:error, reason} -> Logger.error("Unable to start Civile supervisor because: #{inspect reason}") end end
To:
def start(_type, _args) do case Civile.Supervisor.start_link do {:ok, pid} -> :ok = :riak_core.register(vnode_module: Civile.VNode) :ok = :riak_core_node_watcher.service_up(Civile.Service, self()) {:ok, pid} {:error, reason} -> Logger.error("Unable to start Civile supervisor because: #{inspect reason}") end end
The project at this stage is available in the tag start-8
Now compile and run again:
Inside the shell try our new service:
iex(dev@127.0.0.1)1> Civile.Service.ping {:pong, 2, :"dev@127.0.0.1", 251195593916248939066258330623111144003363405824} iex(dev@127.0.0.1)2> Civile.Service.ping 32 {:pong, 33, :"dev@127.0.0.1", 342539446249430371453988632667878832731859189760}
The response is a tuple that contains the atom :pong, the number we passed (or 1 by default) incremented by one, the node and the partition that handled the reply. Right now we only have one node so that's not useful, but in the next steps it will make more sense.
Playing with clustering (in the same machine)
To build a cluster on the same machine we will need to make 3 builds with slightly different configurations to avoid each running node from reading another node's files or trying to use another node's ports, for this we will create 3 different configs and make 3 different builds.
First add a line at the end of config/config.exs:
This will import the configuration for the given mix environment
Now create the following files:
config/dev1.exs
config/dev2.exs
config/dev3.exs
use Mix.Config config :riak_core, node: 'dev3@127.0.0.1', web_port: 8398, handoff_port: 8399, ring_state_dir: 'ring_data_dir_3', platform_data_dir: 'data_3'
Since the default environment is dev we need to have a file for that one too, but without any config since the one in config.exs is ok:
config/dev.exs
The project at this stage is available in the tag start-9
Now let's run the 3 nodes by setting the right profile on MIX_ENV before running our commands:
MIX_ENV=dev1 iex --name dev1@127.0.0.1 -S mix run MIX_ENV=dev2 iex --name dev2@127.0.0.1 -S mix run MIX_ENV=dev3 iex --name dev3@127.0.0.1 -S mix run
On dev2 and dev3 console run the following to join the nodes:
Now let's check the ring status:
It will take a while until the ring rebalances, run the two lines above periodically until they settle.
Now you can run ping from any node and it will work:
Given that the number we pass is used to decide the target vnode, this means that a call with the same number will end up on the same node, but from all nodes it will work transparently.
Running it on the 3 nodes after the cluster rebalanced at around 33% each I got:
iex(dev1@127.0.0.1)2> Civile.Service.ping 32 {:pong, 33, :"dev3@127.0.0.1", 342539446249430371453988632667878832731859189760}
iex(dev2@127.0.0.1)5> Civile.Service.ping 32 {:pong, 33, :"dev3@127.0.0.1", 342539446249430371453988632667878832731859189760}
iex(dev3@127.0.0.1)12> Civile.Service.ping 32 {:pong, 33, :"dev3@127.0.0.1", 342539446249430371453988632667878832731859189760}
This means that the same value was handled by the same node and vnode no matter where the function was called.