2013-03-24 80 views
0

我已经运行下面的地图了Riak阶段:了Riak二郎的map/reduce返回{错误,NOTFOUND}

-module(delete_map_function). 

-export([get_keys/3]). 

%Returns bucket and key pairs from a map phase 
get_keys(Value,_Keydata,_Arg) -> 
    [[riak_object:bucket(Value),riak_object:key(Value)]]. 

而下面了Riak减少阶段:http://contrib.basho.com/delete_keys.html

我不断收到此错误信息:

{"phase":0,"error":"function_clause","input":"{{error,notfound},{<<\"my_bucket\">>,<<\"item_key\">>},undefined}","type":"error","stack":"[{riak_object,bucket,[{error,notfound}],[{file,\"src/riak_object.erl\"},{line,251}]},{delete_map_function,get_keys,3,[{file,\"delete_map_function.erl\"},{line,7}]},{riak_kv_mrc_map,map,3,[{file,\"src/riak_kv_mrc_map.erl\"},{line,164}]},{riak_kv_mrc_map,process,3,[{file,\"src/riak_kv_mrc_map.erl\"},{line,140}]},{riak_pipe_vnode_worker,process_input,3,[{file,\"src/riak_pipe_vnode_worker.erl\"},{line,444}]},{riak_pipe_vnode_worker,wait_for_input,2,[{file,\"src/riak_pipe_vnode_worker.erl\"},{line,376}]},{gen_fsm,...},...]"} 

我通过Java运行作业:

MapReduceResult mapReduceResult = RiakUtils.getPBClient().mapReduce(iq) 
       .addMapPhase(new NamedErlangFunction("delete_map_function", "get_keys")) 
       .addReducePhase(new NamedErlangFunction("delete_reduce_function", "delete")) 
       .execute(); 

我读的地方,我应该使用在映射阶段的filter_notfound的说法,但我仍然不断收到错误,甚至将它添加后:

MapReduceResult mapReduceResult = RiakUtils.getPBClient().mapReduce(iq) 
       .addMapPhase(new NamedErlangFunction("delete_map_function", "get_keys"), "filter_notfound") 
       .addReducePhase(new NamedErlangFunction("delete_reduce_function", "delete")) 
       .execute(); 

我跑了Riak 1.3,并使用Riak Java client V1 .1.0

回答

1

首先。我认为这不是通过map/reduce阶段删除键的有效方法。如果你获得了键列表并将其提供给地图阶段,riak将首先读取所有对象,然后将其提供给你的函数。所以如果你只需要删除对象,最好不要阅读。

二。所有的map/reduce函数应该有以下例外写成:

  • 而不是Value,你可以得到{error, notfound}因为了Riak的最终性质。
  • 你也可以得到删除riak对象为Value。您可以知道该对象被特殊标记删除了dict:is_key(<<"X-Riak-Deleted">>, riak_object:get_metadata(RiakObj))

三。要解决您的错误,您应该从列表中筛选未找到的键:

get_keys({error, notfound},_Keydata,_Arg) -> 
    []; 
get_keys(Value,_Keydata,_Arg) -> 
    [[riak_object:bucket(Value),riak_object:key(Value)]].