Multi-Paxos with riak_ensemble Part 1

In this post I will do the initial steps to setup a project using riak_ensemble and use its core APIs, we will do it manually in the shell on purpose, later (I hope) I will post how to build it properly in code.

First we create a new project, I'm using erlang 19.3 and rebar3 3.4.3:

rebar3 new app name=cadena

Then add riak_ensemble dependency to rebar.config, it should look like this:

{erl_opts, [debug_info]}.
{deps, [{riak_ensemble_ng, "2.4.0"}]}.

Now on 3 different terminals start 3 erlang nodes:

rebar3 shell --name node1@127.0.0.1
rebar3 shell --name node2@127.0.0.1
rebar3 shell --name node3@127.0.0.1

Run the following in every node:

Timeout = 1000.
Ensemble = root.
K1 = <<"k1">>.

application:set_env(riak_ensemble, data_root, "data/" ++ atom_to_list(node())).
application:ensure_all_started(riak_ensemble).

We are setting a variable telling riak_ensemble where to store the data for each node, node1 will store it under data/node1@127.0.0.1 node2 on data/node2@127.0.0.1 and node3 on data/node3@127.0.0.1

After that we ensure all apps that riak_ensemble requires to run are started.

You should see something like this:

ok

18:05:50.548 [info] Application lager started on node 'node1@127.0.0.1'
18:05:50.558 [info] Application riak_ensemble started on node 'node1@127.0.0.1'
{ok,[syntax_tools,compiler,goldrush,lager,riak_ensemble]}

Now on node1 run:

riak_ensemble_manager:enable().

Output:

ok

We start the riak_ensemble_manager in one node only.

Then on node2 we join node1 and node3:

riak_ensemble_manager:join('node1@127.0.0.1' ,node()).
riak_ensemble_manager:join('node3@127.0.0.1' ,node()).

Output on node2:

18:06:39.285 [info] JOIN: success
ok
remote_not_enabled

This command also generates output on node1:

18:06:24.008 [info] {root,'node1@127.0.0.1'}: Leading
18:06:39.281 [info] join(Vsn): {1,64} :: 'node2@127.0.0.1' :: ['node1@127.0.0.1']

On node3 we join node1 and node2:

riak_ensemble_manager:join('node1@127.0.0.1' ,node()).
riak_ensemble_manager:join('node2@127.0.0.1' ,node()).

Output on node 3:

18:07:36.078 [info] JOIN: success
ok

Output on node 1:

18:07:36.069 [info] join(Vsn): {1,291} :: 'node3@127.0.0.1' :: ['node1@127.0.0.1','node2@127.0.0.1']
18:07:36.074 [info] join(Vsn): {1,292} :: 'node3@127.0.0.1' :: ['node1@127.0.0.1','node2@127.0.0.1','node3@127.0.0.1']

Run this on all nodes:

riak_ensemble_manager:check_quorum(Ensemble, Timeout).
riak_ensemble_peer:stable_views(Ensemble, Timeout).
riak_ensemble_manager:cluster().

Output:

true
{ok,true}
['node1@127.0.0.1','node2@127.0.0.1','node3@127.0.0.1']

Everything seems to be ok, we have a cluster!

Now we can write something, let's set key "k1" to value "v1" on all nodes using paxos for consensus.

On node1 run:

V1 = <<"v1">>.
riak_ensemble_client:kover(node(), Ensemble, K1, V1, Timeout).

Output:

{ok,{obj,1,729,<<"k1">>,<<"v1">>}}

We can check on node2 that the value is available:

riak_ensemble_client:kget(node(), Ensemble, K1, Timeout).

Output:

{ok,{obj,1,729,<<"k1">>,<<"v1">>}}

Now we can try a different way to update a value, let's say we want to set a new value but depending on the current value or only if the current value is set to something specific, for that we use kmodify, which receives a function and calls us with the current value and sets the key to the value we return.

On node3 run:

V2 = <<"v2">>.
DefaultVal = <<"v0">>.
ModifyTimeout = 5000.

riak_ensemble_peer:kmodify(node(), Ensemble, K1,
    fun({Epoch, Seq}, CurVal) ->
        io:format("CurVal: ~p ~p ~p to ~p~n", [Epoch, Seq, CurVal, V2]),
        V2
    end,
    DefaultVal, ModifyTimeout).

Output on node 3:

{ok,{obj,1,914,<<"k1">>,<<"v2">>}}

Output on node 1:

CurVal: 1 914 <<"v1">> to <<"v2">>

The call with a function as parameter was done on node3 but it ran on node1, that's the advantage of using the Erlang virtual machine to build distributed systems.

Now let's check if the value was set on all nodes by checking it on node2:

riak_ensemble_client:kget(node(), Ensemble, K1, Timeout).

Output:

{ok,{obj,1,914,<<"k1">>,<<"v2">>}}

Now let's quit on all nodes:

q().

Let's start the cluster again to see if riak_ensemble rememers things, in 3 different terminals run:

rebar3 shell --name node1@127.0.0.1
rebar3 shell --name node2@127.0.0.1
rebar3 shell --name node3@127.0.0.1

On every node:

Timeout = 1000.
Ensemble = root.
K1 = <<"k1">>.

application:set_env(riak_ensemble, data_root, "data/" ++ atom_to_list(node())).
application:ensure_all_started(riak_ensemble).

We set the data_root again and start riak_enseble and its dependencies, after that on node1 we should see:

18:11:55.286 [info] {root,'node1@127.0.0.1'}: Leading

Now let's check that the cluster was initialized correctly:

riak_ensemble_manager:check_quorum(Ensemble, Timeout).
riak_ensemble_peer:stable_views(Ensemble, Timeout).
riak_ensemble_manager:cluster().

Output:

true
{ok,true}
['node1@127.0.0.1','node2@127.0.0.1','node3@127.0.0.1']

You can now check on any node you want if the key is still set:

riak_ensemble_client:kget(node(), Ensemble, K1, Timeout).

Output should be:

{ok,{obj,2,275,<<"k1">>,<<"v2">>}}

Check the generated files under the data folder:

$ tree data

data
├── node1@127.0.0.1
│   └── ensembles
│       ├── 1394851733385875569783788015140658786474476408261_kv
│       ├── ensemble_facts
│       └── ensemble_facts.backup
├── node2@127.0.0.1
│   └── ensembles
│       ├── ensemble_facts
│       └── ensemble_facts.backup
└── node3@127.0.0.1
    └── ensembles
            ├── ensemble_facts
            └── ensemble_facts.backup

6 directories, 7 files

To sum up, we created a project, added riak_ensemble as a dependency, started a 3 node cluster, joined all the nodes, wrote a key with a value, checked that it was available on all nodes, updated the value with a "compare and swap" operation, stopped the cluster, started it again and checked that the cluster was restarted as it was and the value was still there.

Comentarios

Comments powered by Disqus