0

我正在尝试用 Erlang 在 Riak 上做 mapreduce。我有如下数据:

Bucket = "Numbers"
{Keys,values} = {Random key,1},{Random key,2}........{Random key,1000}. 

现在,我存储了从 1 到 1000 的 1000 个值,其中所有键都是由作为参数给出的术语 undefined 自动生成的,因此所有键的值都从 1 到 1000 开始。

所以我只想要偶数值的数据。使用 mapreduce,我该如何实现呢?

4

1 回答 1

0

您将按照http://docs.basho.com/riak/latest/dev/advanced/mapreduce/中的描述构建相位函数

一种可能的地图功能:

Mapfun = fun(Object, _KeyData, _Arg) ->
    %% get the object value, convert to integer and check if even
    Value = list_to_integer(binary_to_term(riak_object:get_value(Object))),
    case Value rem 2 of
      0 -> [Value];
      1 -> []
    end
end.

尽管您可能不想在遇到兄弟姐妹时完全失败:

Mapfun = fun(Object, _KeyData, _Arg) ->
    Values = riak_object:get_values(Object),
    case length(Values) of        %% checking for siblings
      1 ->                   %% only 1 value == no siblings
         I = list_to_integer(binary_to_term(hd(Values))),
         case I rem 2 of
            0 -> [I];    %% value is even
            1 -> []          %% value is odd
         end;
      _ -> []                %% What should happen with siblings?
    end
end.

您可能还需要防止或检查其他情况:包含非数字字符的值、空值、已删除的值(墓碑),仅举几例。

编辑:
请注意:执行完整的 MapReduce 作业将需要 Riak 从磁盘读取每个值,这可能会导致大量数据集出现极端延迟和超时。可能不是您想在生产中做的事情。


执行 MapReduce 的完整示例(出于空间考虑,限于数字 1 到 200):

假设您已经 使用上面的第二个 Mapfun克隆并构建了riak-erlang-client

erl -pa {path-to-riak-erlang-client}/ebin

定义一个reduce函数对列表进行排序

Reducefun = fun(List,_) -> 
    lists:sort(List) 
end.

连接到本地 Riak 服务器

{ok, Pid} = riakc_pb_socket:start_link("127.0.0.1", 8087).

生成一些测试数据

[ riakc_pb_socket:put(
                  Pid,
                  riakc_obj:new(
                            <<"numbers">>,
                            list_to_binary("Key" ++ V),V
                            )
                  ) || V <- [ integer_to_list(Itr) || Itr <- lists:seq(1,200)]],

使用此客户端执行 MapReduce 的功能是
mapred(pid(), mapred_inputs(), [mapred_queryterm()])

这是自述文件中定义mapred_queryterm的形式的阶段规范列表。对于此示例,有 2 个阶段:{Type, FunTerm, Arg, Keep}

  • 仅选择偶数的映射阶段
    {map, Mapfun, none, true}
  • 对结果进行排序的归约阶段
    {reduce, Reducefun, none, true}

执行 MapReduce 查询

{ok,Results} = riakc_pb_socket:mapred(
                              Pid,  %% The socket pid from above
                              <<"numbers">>,  %% Input is the bucket 
                              [{map,{qfun,Mapfun},none,true},
                              {reduce,{qfun,Reducefun},none,true}]
                                     ),

Results将是一个列表,[{_Phase Index_, _Phase Output_}]每个阶段都有一个单独的条目Keep,在这个例子中,两个阶段都被标记为保持,所以在这个例子Results中将是
[{0,[_map phase result_]},{1,[_reduce phase result_]}]

打印出每个阶段的结果:

[ io:format("MapReduce Result of phase ~p:~n~P~n",[P,Result,500]) 
                || {P,Result} <- Results ].

当我运行它时,我的输出是:

MapReduce Result of phase 0:
[182,132,174,128,8,146,18,168,70,98,186,118,50,28,22,112,82,160,114,106,12,26,
 124,14,194,64,122,144,172,96,126,162,58,170,108,44,90,104,6,196,40,154,94,
 120,76,48,150,52,4,62,140,178,2,142,100,166,192,66,16,36,38,88,102,68,34,32,
 30,164,110,42,92,138,86,54,152,116,156,72,134,200,148,46,10,176,198,84,56,78,
 130,136,74,190,158,24,184,180,80,60,20,188]
MapReduce Result of phase 1:
[2,4,6,8,10,12,14,16,18,20,22,24,26,28,30,32,34,36,38,40,42,44,46,48,50,52,54,
 56,58,60,62,64,66,68,70,72,74,76,78,80,82,84,86,88,90,92,94,96,98,100,102,
 104,106,108,110,112,114,116,118,120,122,124,126,128,130,132,134,136,138,140,
 142,144,146,148,150,152,154,156,158,160,162,164,166,168,170,172,174,176,178,
 180,182,184,186,188,190,192,194,196,198,200]
[ok,ok]
于 2014-05-13T14:03:53.197 回答