Skip to main content

This is my blog, more about me at marianoguerra.github.io

πŸ¦‹ @marianoguerra.org 🐘 @marianoguerra@hachyderm.io 🐦 @warianoguerra

Riak Core on Partisan on Elixir Tutorial: Migrating Data with Handoff

Previous post: Riak Core on Partisan on Elixir Tutorial: We can make a Key Value Store out of that.

We are going to implement handoff by completing some of the remaining callbacks in our vnode.

Why? if a node dies and another takes his work or if we add a new node and the vnodes must be rebalanced we need to handle handoff.

The reasons to start a handoff are:

  • A ring update event for a ring that all other nodes have already seen.

  • A secondary vnode is idle for a period of time and the primary, original owner of the partition is up again.

When this happen riak_core will inform the vnode that handoff is starting, calling handoff_starting, if it returns false it’s cancelled, if it returns true it calls is_empty, that must return false to inform that the vnode has something to handoff (it’s not empty) or true to inform that the vnode is empty, in our case we ask for the first element of the ets table and if it’s the special value $end_of_table we know it’s empty, if it returns true the handoff is considered finished, if false then a call is done to handle_handoff_command.

The fold_req_v2 macro is defined in the riak_core_vnode.hrl header file as riak_core_fold_req_v2 which we include using the Record module.

This function must iterate through all the keys it stores and for each of them call foldfun with the key as first argument, the value as second argument and the latest acc_in value as third.

The result of the function call is the new acc_in you must pass to the next call to foldfun, the last accumulator value (acc_final) must be returned by handle_handoff_command.

For each call to foldfun(k, v, acc_in) riak_core will send it to the new vnode, to do that it must encode the data before sending, it does this by calling encode_handoff_item(k, v), where you must encode the data before sending it.

When the value is received by the new vnode it must decode it and do something with it, this is done by the function handle_handoff_data, where we decode the received data and do the appropriate thing with it.

When we sent all the key/values handoff_finished will be called and then delete so we cleanup the data on the old vnode.

You can decide to handle other commands sent to the vnode while the handoff is running, you can choose to do one of the following:

  • Handle it in the current vnode

  • Forward it to the vnode we are handing off

  • Drop it

What to do depends on the design of you app, all of them have tradeoffs.

A diagram of the flow:

+-----------+      +----------+        +----------+
|           | true |          | false  |          |
| Starting  +------> is_empty +--------> fold_req |
|           |      |          |        |          |
+-----+-----+      +----+-----+        +----+-----+
      |                 |                   |
      | false           | true              | ok
      |                 |                   |
+-----v-----+           |              +----v-----+     +--------+
|           |           |              |          |     |        |
| Cancelled |           +--------------> finished +-----> delete |
|           |                          |          |     |        |
+-----------+                          +----------+     +--------+

Replace the content of lib/civile_vnode.ex with:

defmodule Civile.VNode do
  require Logger
  @behaviour :riak_core_vnode

  require Record
  Record.defrecord :fold_req_v2, :riak_core_fold_req_v2, Record.extract(:riak_core_fold_req_v2, from_lib: "riak_core/include/riak_core_vnode.hrl")

  def start_vnode(partition) do
    :riak_core_vnode_master.get_vnode_pid(partition, __MODULE__)
  end

  def init([partition]) do
    table_name = :erlang.list_to_atom('civile_' ++ :erlang.integer_to_list(partition))

    table_id =
      :ets.new(table_name, [:set, {:write_concurrency, false}, {:read_concurrency, false}])

    state = %{
      partition: partition,
      table_name: table_name,
      table_id: table_id
    }

    {:ok, state}
  end

  def handle_command({:ping, v}, _sender, state = %{partition: partition}) do
    {:reply, {:pong, v + 1, node(), partition}, state}
  end

  def handle_command({:put, {k, v}}, _sender, state = %{table_id: table_id, partition: partition}) do
    :ets.insert(table_id, {k, v})
    res = {:ok, node(), partition, nil}
    {:reply, res, state}
  end

  def handle_command({:get, k}, _sender, state = %{table_id: table_id, partition: partition}) do
    res =
      case :ets.lookup(table_id, k) do
        [] ->
          {:ok, node(), partition, nil}

        [{_, value}] ->
          {:ok, node(), partition, value}
      end

    {:reply, res, state}
  end

  def handoff_starting(_dest, state = %{partition: partition}) do
    Logger.debug "handoff_starting #{partition}"
    {true, state}
  end

  def handoff_cancelled(state = %{partition: partition}) do
    Logger.debug "handoff_cancelled #{partition}"
    {:ok, state}
  end

  def handoff_finished(_dest, state = %{partition: partition}) do
    Logger.debug "handoff_finished #{partition}"
    {:ok, state}
  end

  def handle_handoff_command(fold_req_v2() = fold_req, _sender, state = %{table_id: table_id, partition: partition}) do
    Logger.debug "handoff #{partition}"
    foldfun = fold_req_v2(fold_req, :foldfun)
    acc0 = fold_req_v2(fold_req, :acc0)
    acc_final = :ets.foldl(fn {k, v}, acc_in ->
        Logger.debug "handoff #{partition}: #{k} #{v}"
        foldfun.(k, v, acc_in)
    end, acc0, table_id)
    {:reply, acc_final, state}
  end

  def handle_handoff_command(_request, _sender, state = %{partition: partition}) do
    Logger.debug "Handoff generic request, ignoring #{partition}"
    {:noreply, state}
  end

  def is_empty(state = %{table_id: table_id, partition: partition}) do
    is_empty = (:ets.first(table_id) == :"$end_of_table")
    Logger.debug "is_empty #{partition}: #{is_empty}"
    {is_empty, state}
  end

  def terminate(reason, %{partition: partition}) do
    Logger.debug "terminate #{partition}: #{reason}"
    :ok
  end

  def delete(state = %{table_id: table_id, partition: partition}) do
    Logger.debug "delete #{partition}"
    true = :ets.delete(table_id)
    {:ok, state}
  end

  def handle_handoff_data(bin_data, state = %{table_id: table_id, partition: partition}) do
    {k, v} = :erlang.binary_to_term(bin_data)
    :ets.insert(table_id, {k, v})
        Logger.debug "handle_handoff_data #{partition}: #{k} #{v}"
    {:reply, :ok, state}
  end

  def encode_handoff_item(k, v) do
     Logger.debug "encode_handoff_item #{k} #{v}"
     :erlang.term_to_binary({k, v})
  end

  def handle_coverage(_req, _key_spaces, _sender, state) do
    {:stop, :not_implemented, state}
  end

  def handle_exit(_pid, _reason, state) do
    {:noreply, state}
  end

  def handle_overload_command(_, _, _) do
    :ok
  end

  def handle_overload_info(_, _idx) do
    :ok
  end
end

Stop all nodes if you have them running.

Let's clean all the data on each node and the ring state so that we can join them again each time we want to try handoff, to make it simpler to run the commands and remember them, put this in a file called Makefile at the root of the project, make sure the indented lines are indented with tabs and not spaces:

start_single:
   iex --name dev@127.0.0.1 -S mix run

start_node1:
   MIX_ENV=dev1 iex --name dev1@127.0.0.1 -S mix run

start_node2:
   MIX_ENV=dev2 iex --name dev2@127.0.0.1 -S mix run

start_node3:
   MIX_ENV=dev3 iex --name dev3@127.0.0.1 -S mix run

clean:
   rm -rf data_1 data_2 data_3 data log ring_data_dir*

setup:
   mix deps.get

The project at this stage is available in the tag handoff-1

Now let's clean the date to start from scratch:

make clean

On a new terminal:

make start_node1

Inside iex run:

for i <- :lists.seq(1, 100) do
    Civile.Service.put("k#{i}", i)
end

This will insert k1: 1, k2: 2,..., k100: 100 , since we are running a single node they will all go to vnodes in node1.

On a new terminal:

make start_node2

Inside iex run:

for i <- :lists.seq(101, 200) do
    Civile.Service.put("k#{i}", i)
end

This will insert k101: 101, k102: 102,..., k200: 200 , since we haven't joined node2 to node1, they will all go to vnodes in node2.

On a new terminal:

make start_node3

Inside iex run:

for i <- :lists.seq(201, 300) do
    Civile.Service.put("k#{i}", i)
end

This will insert k201: 201, k202: 202,..., k300: 300 , since we haven't joined node3 to node1 or node2, they will all go to vnodes in node3.

Let's join node2 and node3 to node1 to form a cluster, on node2 and node3 run:

:riak_core.join('dev1@127.0.0.1')

You will see handoff transfering vnodes and their data between nodes and stopping the vnodes in the old nodes when it finishes transfering their data and removing it.

Periodically you can check the status with:

{:ok, ring} = :riak_core_ring_manager.get_my_ring
:riak_core_ring.pretty_print(ring, [:legend])