Riak Core on Partisan on Elixir Tutorial: Migrating Data with Handoff
Previous post: Riak Core on Partisan on Elixir Tutorial: We can make a Key Value Store out of that.
We are going to implement handoff by completing some of the remaining callbacks in our vnode.
Why? if a node dies and another takes his work or if we add a new node and the vnodes must be rebalanced we need to handle handoff.
The reasons to start a handoff are:
A ring update event for a ring that all other nodes have already seen.
A secondary vnode is idle for a period of time and the primary, original owner of the partition is up again.
When this happen riak_core will inform the vnode that handoff is starting, calling handoff_starting, if it returns false it’s cancelled, if it returns true it calls is_empty, that must return false to inform that the vnode has something to handoff (it’s not empty) or true to inform that the vnode is empty, in our case we ask for the first element of the ets table and if it’s the special value $end_of_table we know it’s empty, if it returns true the handoff is considered finished, if false then a call is done to handle_handoff_command.
The fold_req_v2 macro is defined in the riak_core_vnode.hrl header file as riak_core_fold_req_v2 which we include using the Record module.
This function must iterate through all the keys it stores and for each of them call foldfun with the key as first argument, the value as second argument and the latest acc_in value as third.
The result of the function call is the new acc_in you must pass to the next call to foldfun, the last accumulator value (acc_final) must be returned by handle_handoff_command.
For each call to foldfun(k, v, acc_in) riak_core will send it to the new vnode, to do that it must encode the data before sending, it does this by calling encode_handoff_item(k, v), where you must encode the data before sending it.
When the value is received by the new vnode it must decode it and do something with it, this is done by the function handle_handoff_data, where we decode the received data and do the appropriate thing with it.
When we sent all the key/values handoff_finished will be called and then delete so we cleanup the data on the old vnode.
You can decide to handle other commands sent to the vnode while the handoff is running, you can choose to do one of the following:
Handle it in the current vnode
Forward it to the vnode we are handing off
Drop it
What to do depends on the design of you app, all of them have tradeoffs.
A diagram of the flow:
+-----------+ +----------+ +----------+ | | true | | false | | | Starting +------> is_empty +--------> fold_req | | | | | | | +-----+-----+ +----+-----+ +----+-----+ | | | | false | true | ok | | | +-----v-----+ | +----v-----+ +--------+ | | | | | | | | Cancelled | +--------------> finished +-----> delete | | | | | | | +-----------+ +----------+ +--------+
Replace the content of lib/civile_vnode.ex with:
defmodule Civile.VNode do require Logger @behaviour :riak_core_vnode require Record Record.defrecord :fold_req_v2, :riak_core_fold_req_v2, Record.extract(:riak_core_fold_req_v2, from_lib: "riak_core/include/riak_core_vnode.hrl") def start_vnode(partition) do :riak_core_vnode_master.get_vnode_pid(partition, __MODULE__) end def init([partition]) do table_name = :erlang.list_to_atom('civile_' ++ :erlang.integer_to_list(partition)) table_id = :ets.new(table_name, [:set, {:write_concurrency, false}, {:read_concurrency, false}]) state = %{ partition: partition, table_name: table_name, table_id: table_id } {:ok, state} end def handle_command({:ping, v}, _sender, state = %{partition: partition}) do {:reply, {:pong, v + 1, node(), partition}, state} end def handle_command({:put, {k, v}}, _sender, state = %{table_id: table_id, partition: partition}) do :ets.insert(table_id, {k, v}) res = {:ok, node(), partition, nil} {:reply, res, state} end def handle_command({:get, k}, _sender, state = %{table_id: table_id, partition: partition}) do res = case :ets.lookup(table_id, k) do [] -> {:ok, node(), partition, nil} [{_, value}] -> {:ok, node(), partition, value} end {:reply, res, state} end def handoff_starting(_dest, state = %{partition: partition}) do Logger.debug "handoff_starting #{partition}" {true, state} end def handoff_cancelled(state = %{partition: partition}) do Logger.debug "handoff_cancelled #{partition}" {:ok, state} end def handoff_finished(_dest, state = %{partition: partition}) do Logger.debug "handoff_finished #{partition}" {:ok, state} end def handle_handoff_command(fold_req_v2() = fold_req, _sender, state = %{table_id: table_id, partition: partition}) do Logger.debug "handoff #{partition}" foldfun = fold_req_v2(fold_req, :foldfun) acc0 = fold_req_v2(fold_req, :acc0) acc_final = :ets.foldl(fn {k, v}, acc_in -> Logger.debug "handoff #{partition}: #{k} #{v}" foldfun.(k, v, acc_in) end, acc0, table_id) {:reply, acc_final, state} end def handle_handoff_command(_request, _sender, state = %{partition: partition}) do Logger.debug "Handoff generic request, ignoring #{partition}" {:noreply, state} end def is_empty(state = %{table_id: table_id, partition: partition}) do is_empty = (:ets.first(table_id) == :"$end_of_table") Logger.debug "is_empty #{partition}: #{is_empty}" {is_empty, state} end def terminate(reason, %{partition: partition}) do Logger.debug "terminate #{partition}: #{reason}" :ok end def delete(state = %{table_id: table_id, partition: partition}) do Logger.debug "delete #{partition}" true = :ets.delete(table_id) {:ok, state} end def handle_handoff_data(bin_data, state = %{table_id: table_id, partition: partition}) do {k, v} = :erlang.binary_to_term(bin_data) :ets.insert(table_id, {k, v}) Logger.debug "handle_handoff_data #{partition}: #{k} #{v}" {:reply, :ok, state} end def encode_handoff_item(k, v) do Logger.debug "encode_handoff_item #{k} #{v}" :erlang.term_to_binary({k, v}) end def handle_coverage(_req, _key_spaces, _sender, state) do {:stop, :not_implemented, state} end def handle_exit(_pid, _reason, state) do {:noreply, state} end def handle_overload_command(_, _, _) do :ok end def handle_overload_info(_, _idx) do :ok end end
Stop all nodes if you have them running.
Let's clean all the data on each node and the ring state so that we can join them again each time we want to try handoff, to make it simpler to run the commands and remember them, put this in a file called Makefile at the root of the project, make sure the indented lines are indented with tabs and not spaces:
start_single: iex --name dev@127.0.0.1 -S mix run start_node1: MIX_ENV=dev1 iex --name dev1@127.0.0.1 -S mix run start_node2: MIX_ENV=dev2 iex --name dev2@127.0.0.1 -S mix run start_node3: MIX_ENV=dev3 iex --name dev3@127.0.0.1 -S mix run clean: rm -rf data_1 data_2 data_3 data log ring_data_dir* setup: mix deps.get
The project at this stage is available in the tag handoff-1
Now let's clean the date to start from scratch:
On a new terminal:
Inside iex run:
This will insert k1: 1, k2: 2,..., k100: 100 , since we are running a single node they will all go to vnodes in node1.
On a new terminal:
Inside iex run:
This will insert k101: 101, k102: 102,..., k200: 200 , since we haven't joined node2 to node1, they will all go to vnodes in node2.
On a new terminal:
Inside iex run:
This will insert k201: 201, k202: 202,..., k300: 300 , since we haven't joined node3 to node1 or node2, they will all go to vnodes in node3.
Let's join node2 and node3 to node1 to form a cluster, on node2 and node3 run:
You will see handoff transfering vnodes and their data between nodes and stopping the vnodes in the old nodes when it finishes transfering their data and removing it.
Periodically you can check the status with: