Skip to main content

This is my blog, more about me at marianoguerra.github.io

🦋 @marianoguerra.org 🐘 @marianoguerra@hachyderm.io 🐦 @warianoguerra

Riak Core on Partisan on Elixir Tutorial: We can make a Key Value Store out of that

Previous post: Riak Core on Partisan on Elixir Tutorial: Getting Started.

Now that we have the foundation of our service laid out, let's make it do something useful.

Given that riak_core is an implementation of the Dynamo Architecture which was created to build scalable key value stores, we should follow the same path and build one.

Let's start from the end, imagine our code is ready, what would be the interaction with it?

Let's imagine it's like this, first we ask the service to get the value for a key that doesn't exist yet:

iex(dev@127.0.0.1)1> Civile.Service.get(:k1)
{:ok, :"dev@127.0.0.1",
 913438523331814323877303020447676887284957839360, nil}

We get a tuple as response with 4 items:

  1. status = :ok

    • The request was handled correctly

  2. node = :"dev@127.0.0.1"

    • The node that handled the request

  3. partition = ...

  • The id of the partition that handled this request

  1. result = :nil

  • The operations' result (nil since the key has no value yet)

Let's set :k1 to 42:

iex(dev@127.0.0.1)2> Civile.Service.put(:k1, 42)
{:ok, :"dev@127.0.0.1",
 913438523331814323877303020447676887284957839360, nil}

We get a tuple as response with 4 items:

  1. status = :ok

    • The request was handled correctly

  2. node = :"dev@127.0.0.1"

    • The node that handled the request

  3. partition = ...

  • The id of the partition that handled this request (the same as before)

  1. result = :nil

  • Nothing of interest to result, use a 3 item tuple to keep the same shape as get

Now let's get :k1 again:

iex(dev@127.0.0.1)3> Civile.Service.get(:k1)
{:ok, :"dev@127.0.0.1",
 913438523331814323877303020447676887284957839360, 42}

Now we get 42 as result

Let's try with another key (:k2):

iex(dev@127.0.0.1)4> Civile.Service.get(:k2)
{:ok, :"dev@127.0.0.1",
 365375409332725729550921208179070754913983135744, nil}

Same response as before, but notice that the partition changed, we will see why later.

Let's set :k2 to :hello:

iex(dev@127.0.0.1)5> Civile.Service.put(:k2, :hello)
{:ok, :"dev@127.0.0.1",
 365375409332725729550921208179070754913983135744, nil}

Now get it:

iex(dev@127.0.0.1)6> Civile.Service.get(:k2)
{:ok, :"dev@127.0.0.1",
 365375409332725729550921208179070754913983135744, :hello}

Now let's build this, first let's create the API on lib/civile_service.ex:

defmodule Civile.Service do
  def ping(v \\ 1) do
    send_cmd("ping#{v}", {:ping, v})
  end

  def put(k, v) do
    send_cmd(k, {:put, {k, v}})
  end

  def get(k) do
    send_cmd(k, {:get, k})
  end

  defp send_cmd(k, cmd) do
    idx = :riak_core_util.chash_key({"civile", k})
    pref_list = :riak_core_apl.get_primary_apl(idx, 1, Civile.Service)

    [{index_node, _type}] = pref_list

    :riak_core_vnode_master.sync_command(index_node, cmd, Civile.VNode_master)
  end
end

The module was refactored to reuse the hashing logic in a common function called send_cmd/2. ping/1, put/2 and get/1 use send_cmd/2 by passing the value that will be used to hash as first argument and the command to send to the vnode as second argument.

On the vnode side we are going to use ets as the storage for our key value store, we need to initialize it in the Civile.VNode.init/1 function:

def init([partition]) do
  table_name = :erlang.list_to_atom('civile_' ++ :erlang.integer_to_list(partition))

  table_id =
    :ets.new(table_name, [:set, {:write_concurrency, false}, {:read_concurrency, false}])

  state = %{
    partition: partition,
    table_name: table_name,
    table_id: table_id
  }

  {:ok, state}
end

We create a table name unique for our app and partition, then create a new ets table that has read and write concurrency set to false since only this vnode will be reading and writing on it.

Now let's handle the put command:

def handle_command({:put, {k, v}}, _sender, state = %{table_id: table_id, partition: partition}) do
  :ets.insert(table_id, {k, v})
  res = {:ok, node(), partition, nil}
  {:reply, res, state}
end

Most of the code is extracting the values we need from the state to run the useful line:

:ets.insert(table_id, {k, v})

And then building the response to return.

The get command:

def handle_command({:get, k}, _sender, state = %{table_id: table_id, partition: partition}) do
  res =
    case :ets.lookup(table_id, k) do
      [] ->
        {:ok, node(), partition, nil}

      [{_, value}] ->
        {:ok, node(), partition, value}
    end

  {:reply, res, state}
end

It calls :ets.lookup/2 and handles the case where there's no value and when one is available.

The project at this stage is available in the tag kv-1

See if the code is correct by compiling the project:

mix compile

And then start one or 3 nodes and try the commands from the top of this chapter, let's do it with 3, first we clean the current state to start fresh:

rm -rf data_* data ring_data_dir*

Now let's run the 3 nodes by setting the right profile on MIX_ENV before running our commands:

MIX_ENV=dev1 iex --name dev1@127.0.0.1 -S mix run
MIX_ENV=dev2 iex --name dev2@127.0.0.1 -S mix run
MIX_ENV=dev3 iex --name dev3@127.0.0.1 -S mix run

On dev2 and dev3 console run the following to join the nodes:

:riak_core.join('dev1@127.0.0.1')

Now let's check the ring status until it settles since we are not migrating data on vnode migration yet:

{:ok, ring} = :riak_core_ring_manager.get_my_ring
:riak_core_ring.pretty_print(ring, [:legend])

From node2:

iex(dev2@127.0.0.1)3> Civile.Service.get(:k1)
{:ok, :"dev1@127.0.0.1",
 913438523331814323877303020447676887284957839360, nil}

From node3:

iex(dev3@127.0.0.1)12> Civile.Service.put(:k1, 42)
{:ok, :"dev1@127.0.0.1",
 913438523331814323877303020447676887284957839360, nil}

From node2:

iex(dev2@127.0.0.1)4> Civile.Service.get(:k1)
{:ok, :"dev1@127.0.0.1",
 913438523331814323877303020447676887284957839360, 42}

We can see that all requests where handled by node1 and the same partition since we used the same key on each request, which was used to decide which vnode/partion would handle it.