2016-02-18 24 views
2

我有一个提交挂钩,我想更新一个计数器。Riak提交挂钩CRDT增量计数器

目前我有:

-module(bucket_counter). 
-export([precommit/1]). 


-record(crdt_op, {mod, op, ctx}). 
-define(CRDT_OP, #crdt_op). 

-define(V1_COUNTER_TYPE, riak_kv_pncounter). 

precommit(RObj) -> 
    {ok, Client} = riak:local_client(), 
    Bucket = {<<"counters">>, <<"update_counts">>}, 
    Id = <<"all">>, 
    Counter = case Client:get(Bucket, Id) of 
     {error, notfound} -> riak_object:new(Bucket, Id, 0); 
     {ok, Obj} -> Obj 
    end, 
    io:format("Counter ~p~n", [Counter]), 

    {{Context,_},_} = riak_kv_crdt:value(Counter, ?V1_COUNTER_TYPE), 

    Op = ?CRDT_OP{mod=?V1_COUNTER_TYPE, op={increment, 1}, ctx=Context}, 
    Counter2 = riak_kv_crdt:update(Counter, <<"1234">>, Op), 

    io:format("Gen ~p~n", [Counter2]), 
    %Counter3 = riak_kv_crdt:merge(Counter2), 
    Client:put(Counter2), 
    RObj. 

这会产生错误:

@riak_kv_put_fsm:decode_precommit:887 Problem invoking pre-commit hook bucket_counter:precommit -> error:function_clause 
[{riak_kv_crdt,update_crdt,[[{riak_dt_pncounter,{{dict,5,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[[<<"dot">>|{<<252,97,47,244,1,104,136,26>>,{1,63622971158}}]],[],[],[],[],[],[[<<"content-type">>,97,112,112,108,105,99,97,116,105,111,110,47,114,105,97,107,95,99,111,117,110,116,101,114],[<<"X-Riak-VTag">>,53,87,81,75,79,113,110,77,80,111,117,49,71,101,120,122,52,67,89,115,66,122]],[[<<"index">>]],[],[[<<"X-Riak-Last-Modified">>|{1455,751958,325156}]],[],[]}}},{crdt,riak_dt_pncounter,"application/riak_counter",[{<<252,97,47,244,1,104,136,26>>,1,0}]}}}],<<"1234">>,{crdt_op,riak_kv_pncounter,{increment,1},<<>>}],[{file,"src/riak_kv_crdt.erl"},{line,234}]},{riak_kv_crdt,update,3,[{file,"src/riak_kv_crdt.erl"},{line,61}]},{bucket_counter,precommit,1,[{file,"bucket_counter.erl"},{line,24}]},{riak_kv_put_fsm,invoke_hook,4,[{file,"src/riak_kv_put_fsm.erl"},{line,853}]},{riak_kv_put_fsm,precommit,2,[{file,"src/riak_kv_put_fsm.erl"},{line,503}]},{gen_fsm,handle_msg,7,[{file,"gen_fsm.erl"},{line,505}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,239}]}] 

展望riak_kv源,它看起来像合并只用地图和设置工作,当你有一个背景:

https://github.com/basho/riak_kv/blob/master/src/riak_kv_crdt.erl#L246

我试过删除上下文但随后崩溃,因为兄弟姐妹:

riak_index:parse_object_hook:116 Siblings not allowed: {r_object,{<<"counters">>,<<"update_counts">>},<<"all">>,[{r_content,{dict,2,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[[<<"X-Riak-VTag">>,49,67,49,85,69,121,66,118,111,84,56,85,118,81,104,104,106,74,101,98,49,71]],[],[],[[<<"X-Riak-Last-Modified">>|{1455,750950,430934}]],[],[]}}},<<69,2,0,0,0,17,114,105,97,107,95,100,116,95,112,110,99,111,117,110,116,101,114,71,2,131,108,0,0,0,1,104,3,109,0,0,0,8,252,97,47,244,156,20,105,128,97,1,97,0,106>>},{r_content,{dict,2,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[[<<"X-Riak-VTag">>,49,67,49,85,69,121,66,118,111,84,56,85,118,81,104,104,106,74,101,98,49,71]],[],[],[[<<"X-Riak-Last-Modified">>|{1455,750950,430934}]],[],[]}}},<<69,1,71,1,0,0,0,22,70,1,131,108,0,0,0,1,104,2,109,0,0,0,4,49,50,51,52,97,1,106,0,0,0,4,70,1,131,106>>}],[{<<252,97,47,244,156,20,105,128>>,{1,63622970068}}],{dict,1,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[[clean|true]],[]}}},undefined} 

更新到地图给了我一个兄弟的错误:

-module(bucket_counter). 
-export([precommit/1]). 


-record(crdt_op, {mod, op, ctx}). 
-define(CRDT_OP, #crdt_op). 

-define(V1_COUNTER_TYPE, riak_kv_pncounter). 


precommit(RObj) -> 
    {ok, Client} = riak:local_client(), 
    Bucket = {<<"counters">>, <<"update_counts">>}, 
    Id = <<"all">>, 
    {Counter, Context} = case Client:get(Bucket, Id) of 
     {error, notfound} -> 
      Obj = riak_object:new(Bucket, Id, 0), 
      {Obj, undefined}; 
     {ok, Obj} -> 
      {{CountContext,_},_} = riak_kv_crdt:value(Obj, riak_dt_map), 
      {Obj, CountContext} 
    end, 
    io:format("Counter ~p~n", [Counter]), 

    Op = ?CRDT_OP{mod=riak_dt_map, op={update,[{update,{<<"count">>,riak_dt_emcntr},increment}]}, ctx=Context}, 
    Counter2 = riak_kv_crdt:update(Counter, <<"1234">>, Op), 

    io:format("Gen ~p~n", [Counter2]), 
    %Counter3 = riak_kv_crdt:merge(Counter2), 
    Client:put(Counter2), 
    RObj. 



riak_index:parse_object_hook:116 Siblings not allowed: {r_object,{<<"counters">>,<<"update_counts">>},<<"all">>,[{r_content,{dict,2,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[[<<"X-Riak-VTag">>,74,73,109,71,89,110,100,82,71,101,88,80,71,107,72,105,53,113,78,48,90]],[],[],[[<<"X-Riak-Last-Modified">>|{1455,756505,963692}]],[],[]}}},<<69,2,0,0,0,11,114,105,97,107,95,100,116,95,109,97,112,77,1,131,80,0,0,0,132,120,1,203,96,206,97,96,96,96,204,96,202,5,82,44,134,70,198,38,137,140,89,80,33,136,32,107,114,126,105,94,73,10,3,95,81,102,98,118,124,74,73,124,106,110,114,94,73,81,6,19,138,42,168,86,184,40,22,227,224,66,25,204,137,140,64,200,144,149,149,193,148,5,4,0,198,254,26,118>>},{r_content,{dict,2,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[[<<"X-Riak-VTag">>,74,73,109,71,89,110,100,82,71,101,88,80,71,107,72,105,53,113,78,48,90]],[],[],[[<<"X-Riak-Last-Modified">>|{1455,756505,963692}]],[],[]}}},0}],[],{dict,1,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[[clean|true]],[]}}},undefined} 
2016-02-18 00:48:25.966 [debug] <0.2105.0>@riak_kv_put_fsm:decode_precommit:880 Pre-commit hook riak_index:parse_object_hook failed with reason {siblings_not_allowed,{r_object,{<<"counters">>,<<"update_counts">>},<<"all">>,[{r_content,{dict,2,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[[<<"X-Riak-VTag">>,74,73,109,71,89,110,100,82,71,101,88,80,71,107,72,105,53,113,78,48,90]],[],[],[[<<"X-Riak-Last-Modified">>|{1455,756505,963692}]],[],[]}}},<<69,2,0,0,0,11,114,105,97,107,95,100,116,95,109,97,112,77,1,131,80,0,0,0,132,120,1,203,96,206,97,96,96,96,204,96,202,5,82,44,134,70,198,38,137,140,89,80,33,136,32,107,114,126,105,94,73,10,3,95,81,102,98,118,124,74,73,124,106,110,114,94,73,81,6,19,138,42,168,86,184,40,22,227,224,66,25,204,137,140,64,200,144,149,149,193,148,5,4,0,198,254,26,118>>},{r_content,{dict,2,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[[<<"X-Riak-VTag">>,74,73,109,71,89,110,100,82,71,101,88,80,71,107,72,105,53,113,78,48,90]],[],[],[[<<"X-Riak-Last-Modified">>|{1455,756505,963692}]],[],[]}}},0}],[],{dict,1,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[[clean|true]],[]}}},undefined}} 

回答

2

我已经想通了,我会关闭1.4的文档,而是我需要做的riak_kv_crdt:new(Bucket, Id, riak_dt_map)