Ir al contenido principal

This is my blog, more about me at marianoguerra.github.io

🦋 @marianoguerra.org 🐘 @marianoguerra@hachyderm.io 🐦 @warianoguerra

Riak Core on Partisan on Elixir Tutorial: Getting Started

Previous posts: Riak Core on Partisan on Elixir Tutorial: Introduction and Riak Core on Partisan on Elixir Tutorial: Setup.

Creating the project

mix new civiledb --app civile
cd civiledb

The project at this stage is available in the tag start-1

Add uniendo dependency

uniendo is a library that contains all required dependencies and overrides in a single place to make it easy to use from other projects, add it to the deps sections in mix.exs like this:

defp deps do
  [
    {:uniendo, git: "git://github.com/marianoguerra/uniendo.git", branch: "master"}
  ]
end

The project at this stage is available in the tag start-2

Add application configuration

We need to add riak_core to the list of applications to start and start our own supervisor tree, modify the application section in mix.exs like this:

def application do
  [
    extra_applications: [:riak_core, :logger],
    mod: {Civile, []}
  ]
end

The project at this stage is available in the tag start-3

First build

Let's try building the project as is, mainly to test that the dependencies are compiled correctly.

First we need to get the dependencies:

mix deps.get

If it's the first time it may ask you:

Could not find Hex, which is needed to build dependency :folsom
Shall I install Hex? (if running non-interactively, use "mix local.hex --force") [Yn]

Just hit enter to continue.

When it finishes, try to build the project:

mix compile

The project at this stage is available in the tag start-4

If it's the first time you build a project with dependencies written in erlang it may ask you:

Shall I install rebar3? (if running non-interactively, use "mix local.rebar --force") [Yn]

Just hit enter to continue.

If you get an error compiling you may want to try updating the rebar3 version mix uses with:

mix local.rebar rebar3 ~/bin/rebar3

If setup fails to build, try commenting the line on deps/setup/rebar.config:

{post_hooks, [{compile, "make escriptize"}]}.

Like this:

%{post_hooks, [{compile, "make escriptize"}]}.

Making it start

We need to start our application and riak_core, for that we need some initial setup:

mkdir priv config
cp deps/riak_core/priv/riak_core.schema priv/

The project at this stage is available in the tag start-5

Add the following content to the file in config/config.exs:

use Mix.Config

config :riak_core,
  ring_state_dir: 'ring_data_dir',
  handoff_port: 8099,
  handoff_ip: '127.0.0.1',
  schema_dirs: ['priv']

config :sasl,
  errlog_type: :error

The project at this stage is available in the tag start-6

Edit lib/civile.ex and change its content to:

defmodule Civile do
  use Application
  require Logger

  def start(_type, _args) do
    case Civile.Supervisor.start_link do
      {:ok, pid} ->
        {:ok, pid}
      {:error, reason} ->
        Logger.error("Unable to start Civile supervisor because: #{inspect reason}")
    end
  end
end

Create a new file called lib/civile_supervisor.ex with the following content:

defmodule Civile.Supervisor do
  use Supervisor

  def start_link do
    # riak_core appends _sup to the application name.
    Supervisor.start_link(__MODULE__, [], [name: :civile_sup])
  end

  def init(_args) do
    children = []
    supervise(children, strategy: :one_for_one, max_restarts: 5, max_seconds: 10)
  end

end

The project at this stage is available in the tag start-7

Now recompile the project:

mix compile

And start it:

iex --name dev@127.0.0.1 -S mix run

You should see something like this:

Erlang/OTP 22 [erts-10.5] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1] [hipe]

00:48:07.530 [info] Starting reporters with []
00:48:07.577 [info] Using node name: 'dev@127.0.0.1'
00:48:07.581 [info] Resolving "127.0.0.1"...
00:48:07.583 [info] Resolved "dev@127.0.0.1" to {127,0,0,1}
00:48:07.583 [info] Resolved "127.0.0.1" to {127,0,0,1}
00:48:07.679 [info] Partisan listening on {127,0,0,1}:49489 listen_addrs: [#{ip => {127,0,0,1},port => 49489}]
00:48:07.686 [info] Not using container orchestration; disabling.
00:48:07.694 [info] node 'dev@127.0.0.1' choosing random seed: {52969312,-576460751045187665,-576460752303423453}
00:48:07.728 [info] node 'dev@127.0.0.1' choosing random seed: {52969312,-576460751045187665,-576460752303423453}
00:48:07.774 [info] Configuring partisan dispatch: false
dets: file "data/cluster_meta/manifest.dets" not properly closed, repairing ...
00:48:07.938 [info] New capability: {riak_core,vnode_routing} = proxy
00:48:07.942 [info] New capability: {riak_core,staged_joins} = true
00:48:07.947 [info] New capability: {riak_core,resizable_ring} = true
00:48:07.951 [info] New capability: {riak_core,fold_req_version} = v2
00:48:07.956 [info] New capability: {riak_core,security} = true
00:48:07.960 [info] New capability: {riak_core,bucket_types} = true
00:48:07.964 [info] New capability: {riak_core,net_ticktime} = true
Interactive Elixir (1.9.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(dev@127.0.0.1)1>

You may get a lager crash log at startup, ignore it.

Hit Ctrl+C twice to quit.

Implementing our VNode

Write the following on a new file called lib/civile_vnode.ex:

defmodule Civile.VNode do
  @behaviour :riak_core_vnode

  def start_vnode(partition) do
    :riak_core_vnode_master.get_vnode_pid(partition, __MODULE__)
  end

  def init([partition]) do
    {:ok, %{partition: partition}}
  end

  def handle_command({:ping, v}, _sender, state = %{partition: partition}) do
     {:reply, {:pong, v + 1, node(), partition}, state}
  end

  def handoff_starting(_dest, state) do
    {true, state}
  end

  def handoff_cancelled(state) do
    {:ok, state}
  end

  def handoff_finished(_dest, state) do
    {:ok, state}
  end

  def handle_handoff_command(_fold_req, _sender, state) do
    {:noreply, state}
  end

  def is_empty(state) do
    {true, state}
  end

  def terminate(_reason, _state) do
    :ok
  end

  def delete(state) do
    {:ok, state}
  end

  def handle_handoff_data(_bin_data, state) do
    {:reply, :ok, state}
  end

  def encode_handoff_item(_k, _v) do
  end

  def handle_coverage(_req, _key_spaces, _sender, state) do
    {:stop, :not_implemented, state}
  end

  def handle_exit(_pid, _reason, state) do
    {:noreply, state}
  end

  def handle_overload_command(_, _, _) do
    :ok
  end

  def handle_overload_info(_, _idx) do
    :ok
  end
end

Write the following on a new file called lib/civile_service.ex:

defmodule Civile.Service do

  def ping(v\\1) do
    idx = :riak_core_util.chash_key({"civile", "ping#{v}"})
    pref_list = :riak_core_apl.get_primary_apl(idx, 1, Civile.Service)

    [{index_node, _type}] = pref_list

    :riak_core_vnode_master.sync_command(index_node, {:ping, v}, Civile.VNode_master)
  end

end

In lib/civile_supervisor.ex, add the vnode master as a child to our supervisor

Change:

def init(_args) do
  children = []
  supervise(children, strategy: :one_for_one, max_restarts: 5, max_seconds: 10)
end

To:

def init(_args) do
  children = [
    worker(:riak_core_vnode_master, [Civile.VNode], id: Civile.VNode_master_worker)
  ]
  supervise(children, strategy: :one_for_one, max_restarts: 5, max_seconds: 10)
end

And register our vnode implementation on riak_core.

In lib/civile.ex change:

def start(_type, _args) do
  case Civile.Supervisor.start_link do
    {:ok, pid} ->
      {:ok, pid}
    {:error, reason} ->
      Logger.error("Unable to start Civile supervisor because: #{inspect reason}")
  end
end

To:

def start(_type, _args) do
  case Civile.Supervisor.start_link do
    {:ok, pid} ->
      :ok = :riak_core.register(vnode_module: Civile.VNode)
      :ok = :riak_core_node_watcher.service_up(Civile.Service, self())
      {:ok, pid}
    {:error, reason} ->
      Logger.error("Unable to start Civile supervisor because: #{inspect reason}")
  end
end

The project at this stage is available in the tag start-8

Now compile and run again:

mix compile
iex --name dev@127.0.0.1 -S mix run

Inside the shell try our new service:

iex(dev@127.0.0.1)1> Civile.Service.ping
{:pong, 2, :"dev@127.0.0.1",
 251195593916248939066258330623111144003363405824}

iex(dev@127.0.0.1)2> Civile.Service.ping 32
{:pong, 33, :"dev@127.0.0.1",
 342539446249430371453988632667878832731859189760}

The response is a tuple that contains the atom :pong, the number we passed (or 1 by default) incremented by one, the node and the partition that handled the reply. Right now we only have one node so that's not useful, but in the next steps it will make more sense.

Playing with clustering (in the same machine)

To build a cluster on the same machine we will need to make 3 builds with slightly different configurations to avoid each running node from reading another node's files or trying to use another node's ports, for this we will create 3 different configs and make 3 different builds.

First add a line at the end of config/config.exs:

import_config "#{Mix.env}.exs"

This will import the configuration for the given mix environment

Now create the following files:

config/dev1.exs

use Mix.Config

config :riak_core,
  node: 'dev1@127.0.0.1',
  web_port: 8198,
  handoff_port: 8199,
  ring_state_dir: 'ring_data_dir_1',
  platform_data_dir: 'data_1'

config/dev2.exs

use Mix.Config

config :riak_core,
  node: 'dev2@127.0.0.1',
  web_port: 8298,
  handoff_port: 8299,
  ring_state_dir: 'ring_data_dir_2',
  platform_data_dir: 'data_2'

config/dev3.exs

use Mix.Config

config :riak_core,
  node: 'dev3@127.0.0.1',
  web_port: 8398,
  handoff_port: 8399,
  ring_state_dir: 'ring_data_dir_3',
  platform_data_dir: 'data_3'

Since the default environment is dev we need to have a file for that one too, but without any config since the one in config.exs is ok:

config/dev.exs

use Mix.Config

The project at this stage is available in the tag start-9

Now let's run the 3 nodes by setting the right profile on MIX_ENV before running our commands:

MIX_ENV=dev1 iex --name dev1@127.0.0.1 -S mix run
MIX_ENV=dev2 iex --name dev2@127.0.0.1 -S mix run
MIX_ENV=dev3 iex --name dev3@127.0.0.1 -S mix run

On dev2 and dev3 console run the following to join the nodes:

:riak_core.join('dev1@127.0.0.1')

Now let's check the ring status:

{:ok, ring} = :riak_core_ring_manager.get_my_ring
:riak_core_ring.pretty_print(ring, [:legend])

It will take a while until the ring rebalances, run the two lines above periodically until they settle.

Now you can run ping from any node and it will work:

Civile.Service.ping 32

Given that the number we pass is used to decide the target vnode, this means that a call with the same number will end up on the same node, but from all nodes it will work transparently.

Running it on the 3 nodes after the cluster rebalanced at around 33% each I got:

iex(dev1@127.0.0.1)2> Civile.Service.ping 32
{:pong, 33, :"dev3@127.0.0.1",
 342539446249430371453988632667878832731859189760}
iex(dev2@127.0.0.1)5> Civile.Service.ping 32
{:pong, 33, :"dev3@127.0.0.1",
 342539446249430371453988632667878832731859189760}
iex(dev3@127.0.0.1)12> Civile.Service.ping 32
{:pong, 33, :"dev3@127.0.0.1",
 342539446249430371453988632667878832731859189760}

This means that the same value was handled by the same node and vnode no matter where the function was called.