How to use leveled, a pure erlang leveldb implementation

Yesterday at the riak_core tutorial at CodeBEAMSF I was trying to implement a leveled based backend for the key value store we were building, I was having troubles with leveled crashing when trying to destroy it (stop and remove files in leveled parlance), after fighting for a while I needed a smaller example to see if it was my mistake or not.

I decided to do the smaller example and to share the process here.

First we need some erlang application to hold our leveled dependency and configuration, let's do it by creating an erlang release with rebar3:

rebar3 new release name=lvld
cd lvld

Now that the skeleton is ready, we need to change rebar.config to add the information to use leveled, the resulting rebar.config below, see comments:

{erl_opts, [debug_info]}.

{deps, [
    % add leveled dependency
    {leveled, {git, "https://github.com/martinsumner/leveled.git", {branch, "master"}}}
]}.

{relx, [{release, { lvld, "0.1.0" },
    [lvld,
    % leveled needs crypto
    crypto,
    % make sure to load leveled, don't start it, it's not an app
    {leveled, load},
    % required by leveled
    {lz4, load},
    sasl]},

    {sys_config, "./config/sys.config"},
    {vm_args, "./config/vm.args"},

    {dev_mode, true},
    {include_erts, false},

    {extended_start_script, true}]
}.

{profiles, [{prod, [{relx, [{dev_mode, false},
    {include_erts, true}]}]
            }]
}.

% leveled generates lots of warnings and has warnings_as_errors set, we need
% to override that by copying the erl_opts field without warnings_as_errors
{overrides,
    [{override, leveled,
        [{erl_opts, [{platform_define, "^1[7-8]{1}", old_rand},
            {platform_define, "^R", old_rand},
            {platform_define, "^R", no_sync}]}]}
    ]}.

We will build a wrapper for leveled that exposes a simple kv store in apps/lvld/src/lvld_kv.erl:

-module(lvld_kv).
-export([new/1, get/3, put/4, delete/3, keys/2, close/1, delete/1, is_empty/1, foldl/3]).

-include_lib("leveled/include/leveled.hrl").

-record(state, {bookie, base_path}).

new(Opts=#{path := Path}) ->
    LedgerCacheSize = maps:get(ledger_cache_size, Opts, 2000),
    JournalSize = maps:get(journal_size, Opts, 500000000),
    SyncStrategy = maps:get(sync_strategy, Opts, none),
    {ok, Bookie} = leveled_bookie:book_start(Path, LedgerCacheSize,
                                             JournalSize, SyncStrategy),
    State = #state{bookie=Bookie, base_path=Path},
    {ok, State}.

put(State=#state{bookie=Bookie}, Bucket, Key, Value) ->
    R = leveled_bookie:book_put(Bookie, Bucket, Key, Value, []),
    {R, State}.

get(State=#state{bookie=Bookie}, Bucket, Key) ->
    K = {Bucket, Key},
    Res = case leveled_bookie:book_get(Bookie, Bucket, Key) of
              not_found -> {not_found, K};
              {ok, Value} -> {found, {K, Value}}
          end,
    {Res, State}.

delete(State=#state{bookie=Bookie}, Bucket, Key) ->
    R = leveled_bookie:book_delete(Bookie, Bucket, Key, []),
    {R, State}.

keys(State=#state{bookie=Bookie}, Bucket) ->
    FoldHeadsFun = fun(_B, K, _ProxyV, Acc) -> [K | Acc] end,
    {async, FoldFn} = leveled_bookie:book_returnfolder(Bookie,
                            {foldheads_bybucket,
                                ?STD_TAG,
                                Bucket,
                                all,
                                FoldHeadsFun,
                                true, true, false}),
    Keys = FoldFn(),
    {Keys, State}.

is_empty(State=#state{bookie=Bookie}) ->
    FoldBucketsFun = fun(B, Acc) -> [B | Acc] end,
    {async, FoldFn} = leveled_bookie:book_returnfolder(Bookie,
                                                       {binary_bucketlist,
                                                        ?STD_TAG,
                                                        {FoldBucketsFun, []}}),
    IsEmpty = case FoldFn() of
                  [] -> true;
                  _ -> false
              end,
    {IsEmpty, State}.

close(State=#state{bookie=Bookie}) ->
    R = leveled_bookie:book_close(Bookie),
    {R, State}.

delete(State=#state{base_path=Path}) ->
    R = remove_path(Path),
    {R, State}.


foldl(Fun, Acc0, State=#state{bookie=Bookie}) ->
    FoldObjectsFun = fun(B, K, V, Acc) -> Fun({{B, K}, V}, Acc) end,
    {async, FoldFn} = leveled_bookie:book_returnfolder(Bookie, {foldobjects_allkeys,
                                                                ?STD_TAG,
                                                                {FoldObjectsFun, Acc0},
                                                                true}),
    AccOut = FoldFn(),
    {AccOut, State}.

% private functions

sub_files(From) ->
    {ok, SubFiles} = file:list_dir(From),
    [filename:join(From, SubFile) || SubFile <- SubFiles].

remove_path(Path) ->
    case filelib:is_dir(Path) of
        false ->
            file:delete(Path);
        true ->
            lists:foreach(fun(ChildPath) -> remove_path(ChildPath) end,
                          sub_files(Path)),
            file:del_dir(Path)
    end.

We are ready to build a release and try our kv api on the repl:

rebar3 release
./_build/default/rel/lvld/bin/lvld console

This is the code we will run in the repl, I put it here so it's easy to read and copy and paste:

Nums = lists:seq(1, 10).
Buckets = lists:map(fun (N) -> list_to_binary("bucket-" ++ integer_to_list(N)) end,
Nums).
Keys = lists:map(fun (N) -> list_to_binary("key-" ++ integer_to_list(N)) end, Nums).

GenValue = fun (Bucket, Key) -> <<"v/", Bucket/binary, "/", Key/binary>> end.

{ok, Kv} = lvld_kv:new(#{path => "/tmp/lvld_test"}).
lists:foreach(fun (Bucket) ->
        lists:foreach(fun (Key) ->
                Val = GenValue(Bucket, Key),
                lvld_kv:put(Kv, Bucket, Key, Val)
        end, Keys)
end, Buckets).


B1 = <<"bucket-1">>.
K1 = <<"key-1">>.
V1 = <<"value-1">>.
B2 = <<"bucket-2">>.
K2 = <<"key-2">>.

FoldFn = fun ({{B, K}, V}, AccIn) -> [{B, K, V} | AccIn] end.
lvld_kv:foldl(FoldFn, [], Kv).

lvld_kv:put(Kv, B1, K1, V1).
lvld_kv:get(Kv, B1, K1).
lvld_kv:delete(Kv, B1, K1).
lvld_kv:get(Kv, B1, K1).

lvld_kv:keys(Kv, B1).

lvld_kv:close(Kv).
lvld_kv:delete(Kv).

The results of running it (removing some of the verbose logging):

(lvld@ganesha)1> Nums = lists:seq(1, 10).

[1,2,3,4,5,6,7,8,9,10]


(lvld@ganesha)2> Buckets = lists:map(fun (N) -> list_to_binary("bucket-" ++ integer_to_list(N)) end, Nums).

[<<"bucket-1">>,<<"bucket-2">>,<<"bucket-3">>,
 <<"bucket-4">>,<<"bucket-5">>,<<"bucket-6">>,<<"bucket-7">>,
  <<"bucket-8">>,<<"bucket-9">>,<<"bucket-10">>]


(lvld@ganesha)3> Keys = lists:map(fun (N) -> list_to_binary("key-" ++ integer_to_list(N)) end, Nums).

  [<<"key-1">>,<<"key-2">>,<<"key-3">>,<<"key-4">>,
   <<"key-5">>,<<"key-6">>,<<"key-7">>,<<"key-8">>,<<"key-9">>,
    <<"key-10">>]


(lvld@ganesha)4> GenValue = fun (Bucket, Key) -> <<"v/", Bucket/binary, "/", Key/binary>> end.

#Fun<erl_eval.12.99386804>


(lvld@ganesha)5> {ok, Kv} = lvld_kv:new(#{path => "/tmp/lvld_test"}).

{ok,{state,<0.264.0>,"/tmp/lvld_test"}}

(lvld@ganesha)6> B1 = <<"bucket-1">>.
<<"bucket-1">>

(lvld@ganesha)7> K1 = <<"key-1">>.
<<"key-1">>

(lvld@ganesha)8> V1 = <<"value-1">>.
<<"value-1">>

(lvld@ganesha)9> B2 = <<"bucket-2">>.
<<"bucket-2">>

(lvld@ganesha)10> K2 = <<"key-2">>.
<<"key-2">>

(lvld@ganesha)11> FoldFn = fun ({{B, K}, V}, AccIn) -> [{B, K, V} | AccIn] end.
#Fun<erl_eval.12.99386804>

(lvld@ganesha)13> lists:foreach(fun (Bucket) ->
(lvld@ganesha)13>         lists:foreach(fun (Key) ->
(lvld@ganesha)13>                 Val = GenValue(Bucket, Key),
(lvld@ganesha)13>                 lvld_kv:put(Kv, Bucket, Key, Val)
(lvld@ganesha)13>         end, Keys)
(lvld@ganesha)13> end, Buckets).

(lvld@ganesha)14> lvld_kv:foldl(FoldFn, [], Kv).

{[{<<"bucket-9">>,<<"key-9">>,<<"v/bucket-9/key-9">>},
  {<<"bucket-9">>,<<"key-8">>,<<"v/bucket-9/key-8">>},
  {<<"bucket-9">>,<<"key-7">>,<<"v/bucket-9/key-7">>},
  {<<"bucket-9">>,<<"key-6">>,<<"v/bucket-9/key-6">>},
  {<<"bucket-9">>,<<"key-5">>,<<"v/bucket-9/key-5">>},
  {<<"bucket-9">>,<<"key-4">>,<<"v/bucket-9/key-4">>},
  {<<"bucket-9">>,<<"key-3">>,<<"v/bucket-9/key-3">>},
  {<<"bucket-9">>,<<"key-2">>,<<"v/bucket-9/key-2">>},
  {<<"bucket-9">>,<<"key-10">>,<<"v/bucket-9/key-10">>},
  {<<"bucket-9">>,<<"key-1">>,<<"v/bucket-9/key-1">>},
  {<<"bucket-8">>,<<"key-9">>,<<"v/bucket-8/key-9">>},
  {<<"bucket-8">>,<<"key-8">>,<<"v/bucket-8/key-8">>},
  {<<"bucket-8">>,<<"key-7">>,<<"v/bucket-8/key-7">>},
  {<<"bucket-8">>,<<"key-6">>,<<"v/bucket-8/key-6">>},
  {<<"bucket-8">>,<<"key-5">>,<<"v/bucket-8/key-5">>},
  {<<"bucket-8">>,<<"key-4">>,<<"v/bucket-8/key-4">>},
  {<<"bucket-8">>,<<"key-3">>,<<"v/bucket-8/key-3">>},
  {<<"bucket-8">>,<<"key-2">>,<<"v/bucket-8/key-2">>},
  {<<"bucket-8">>,<<"key-10">>,<<"v/bucket-8/key-10">>},
  {<<"bucket-8">>,<<"key-1">>,<<"v/bucket-8/key-1">>},
  {<<"bucket-7">>,<<"key-9">>,<<"v/bucket-7/key-9">>},
  {<<"bucket-7">>,<<"key-8">>,<<"v/bucket-7/k"...>>},
  {<<"bucket-7">>,<<"key-7">>,<<"v/bucket"...>>},
  {<<"bucket-7">>,<<"key-6">>,<<"v/bu"...>>},
  {<<"bucket-7">>,<<"key-"...>>,<<...>>},
  {<<"buck"...>>,<<...>>,...},
  {<<...>>,...},
  {...}|...],

 {state,<0.264.0>,"/tmp/lvld_test"}}

(lvld@ganesha)15> lvld_kv:put(Kv, B1, K1, V1).
{ok,{state,<0.264.0>,"/tmp/lvld_test"}}

(lvld@ganesha)16> lvld_kv:get(Kv, B1, K1).
{{found,{{<<"bucket-1">>,<<"key-1">>},<<"value-1">>}},
 {state,<0.264.0>,"/tmp/lvld_test"}}

(lvld@ganesha)17> lvld_kv:delete(Kv, B1, K1).
{ok,{state,<0.264.0>,"/tmp/lvld_test"}}

(lvld@ganesha)18> lvld_kv:get(Kv, B1, K1).
{{not_found,{<<"bucket-1">>,<<"key-1">>}},
 {state,<0.264.0>,"/tmp/lvld_test"}}

(lvld@ganesha)19> lvld_kv:keys(Kv, B1).
{<"key-9">>,<<"key-8">>,<<"key-7">>,<<"key-6">>,
  <<"key-5">>,<<"key-4">>,<<"key-3">>,<<"key-2">>,
  <<"key-10">>],
 {state,<0.264.0>,"/tmp/lvld_test"}}

(lvld@ganesha)20> lvld_kv:close(Kv).
{ok,{state,<0.264.0>,"/tmp/lvld_test"}}

(lvld@ganesha)21> lvld_kv:delete(Kv).
{ok,{state,<0.264.0>,"/tmp/lvld_test"}}

In case you want to know the case for the crashing, when calling destroy on leveled, it returns destroy as reason for gen_server stop, which doesn't seem to make Erlang happy and it crashes the process and propagates the error.

The solution here is to just close it and remove the files myself (the difference between close and destroy is file removal).

Comments

Comments powered by Disqus