1

任何人都可以举例说明将 erlang 模块和 erlang 函数发送到

query.map()

在 python riak 客户端中,在文档中就像

function (string, list) – Either a named Javascript function (ie: ‘Riak.mapValues’), or an anonymous javascript function (ie: ‘function(...) ... ‘ or an array [‘erlang_module’, ‘function’].
options (dict) – phase options, containing ‘language’, ‘keep’ flag, and/or ‘arg’.

但是没有关于我必须发送什么的明确信息。实际上我一直在将 query.map() 阶段作为

query.map(['maps','fun']) # maps is the maps.erl and fun is the function in the maps.erl file

如文档中所述,我已在 app.cofig 下设置了梁文件路径,以保留已编译的梁文件。我做了所有这些事情,但是,运行命令后出现错误

 query.map(['maps','funs'])
 >>> query.run()
 Traceback (most recent call last):
 File "<input>", line 1, in <module>
 File "/usr/lib/python2.6/site-packages/riak-1.5.2-py2.6.egg/riak/mapreduce.py", line 234, in run
 result = t.mapred(self._inputs, query, timeout)
 File "/usr/lib/python2.6/site-packages/riak-1.5.2-py2.6.egg/riak/transports/http.py", line 322, in mapred
(repr(response[0]), repr(response[1])))
 Exception: Error running MapReduce operation. Headers: {'date': 'Mon, 26 May 2014 11:24:04 GMT', 'content-length': '1121', 'content-type': 'application/json'
, 'http_code': 500, 'server': 'MochiWeb/1.1 WebMachine/1.10.0 (never breaks eye contact)'} Body: '{"phase":0,"error":"undef","input":"{ok,{r_object,<<\\"tst\
\">>,<<\\"test5\\">>,[{r_content,{dict,3,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[],[[<<\\"X-Riak-VTag\\
">>,50,53,75,69,55,80,113,109,65,69,117,106,109,109,99,65,72,101,75,82,115,86]],[[<<\\"index\\">>]],[],[[<<\\"X-Riak-Last-Modified\\">>|{1400,340359,663135}]
],[],[]}}},<<\\"6\\">>}],[{<<197,82,177,11,83,115,139,10>>,{1,63567559559}}],   {dict,1,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],
[],[],[],[],[],[],[],[],[],...}}},...},...}","type":"error","stack":"[{maps,funs,  [{r_object,<<\\"tst\\">>,<<\\"test5\\">>,[{r_content,{dict,3,16,16,8,80,48,{
[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],[],[],[], [[<<\\"X-Riak-VTag\\">>,50,53,75,69,55,80,113,109,65,69,117,106,109,109,99,6
5,72,101,75,82,115,86]],[[<<\\"index\\">>]],[],[[<<\\"X-Riak-Last-Modified\\">>|{1400,340359,663135}]],[],[]}}},<<\\"6\\">>}],[{<<197,82,177,11,83,115,139,10
>>,{1,63567559559}}],{dict,1,16,16,8,80,48,{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},{{[],[],[],[],[],[],[],...}}},...},...],...},...]"}'

我错过了什么错了,请建议我

4

1 回答 1

2

在 Python 客户端中使用 Erlang 映射函数有 3 个部分:

  • 编写和编译 Erlang 模块
  • 准备 Riak 集群
  • 从 Python 客户端调用函数

Erlang 模块应该相当简单,对于这个例子,我将让 map 函数返回每个键的值(兄弟)的数量:

-module(custom_mr).

-export([mapcount/3]).

mapcount(Obj,_Keydata,_Arg) ->
  [length(riak_object:get_values(Obj))].

Erlang 的版本以微妙的方式变化,因此使用 Riak 捆绑的 Erlang 会更安全,或者如果您从源代码构建,则使用相同的版本来编译它。生成的 .beam 文件需要放置在运行 Riak 的用户可读的目录中 - 如果您使用软件包安装,则默认为 riak。您将需要部署 .beam 文件并修改集群中每个节点的 app.config。

# /usr/lib/riak/erts-5.9.1/bin/erlc custom_mr.erl
# mkdir /var/lib/riak/custom_code
# mv custom_mr.beam /var/lib/riak/custom_code
# chown -R riak:riak /var/lib/riak/custom_code

然后编辑 app.config 并添加{add_paths,["/var/lib/riak/custom_code"]}到该riak_kv部分,然后重新启动节点。

测试riak attach以确保新模块已加载 - 在此示例中,节点 1-4 已加载模块,但节点 5 已关闭:

# riak attach
1> riak_core_util:rpc_every_member_ann(code,which,[custom_mr]).
{[{'riak@node1.lab.local',"/var/lib/riak/custom_code/custom_mr.beam"},
  {'riak@node2.lab.local',"/var/lib/riak/custom_code/custom_mr.beam"},
  {'riak@node3.lab.local',"/var/lib/riak/custom_code/custom_mr.beam"},
  {'riak@node4.lab.local',"/var/lib/riak/custom_code/custom_mr.beam"}],
 ['riak@node5.lab.local']}
2> custom_mr:mapcount(riak_object:new(<<"test">>,<<"test">>,<<"test">>),keydata,arg).
[1]

(如果您运行的是 1.4 之前的版本,则使用 ctrl-d 与 riak 控制台分离,否则使用 ctrl-c a)

最后,Python 代码(我使用了文件名 test.py):

import riak

client = riak.RiakClient()

test_bucket = client.bucket('test')

data1 = test_bucket.new('key1',data={'field1':'1data1','field2':'1data2','field3':1})
data1.store()

data2 = test_bucket.new('key2',data={'field1':'2data1','field2':'2data2','field3':2})
data2.store()

data3 = test_bucket.new('key3',data={'field1':'3data1','field2':'3data2','field3':3})
data3.store()

query = riak.RiakMapReduce(client).add('testbucket')
query.map(['custom_mr','mapcount'])

for result in query.run():
    print "%s" % (result)

运行此代码将为存储桶中的每个键返回 1:

#python test.py
1
1
1

注意在输入新值之前我没有做 get ,所以如果你的默认存储桶属性包括allow_mult:true,第二次运行这个将为每个值创建一个兄弟,你会得到 '2' 而不是 '1'


添加更多示例


新建模块,如上编译安装

-module(custom_mr).

-export([mapcount/3,
         mapvalue/3,
         mapfield/3,
         mapfieldwithid/3,
         reducecount/2,
         reducepropsort/2,
         reducedoublesort/2,
         reducesort/2]).

mapcount(Obj,_Kd,_Arg) ->
  [length(riak_object:get_values(Obj))].

mapvalue(Obj,_Kd,_Arg) ->
  [hd(riak_object:get_values(Obj))].

mapfield(Obj,_Kd,Arg) ->
  Val = case catch mochijson2:decode(hd(riak_object:get_values(Obj))) of
    {struct, Data} ->
        case Arg =:= null of
          true -> Data;
          false -> [{Arg,proplists:get_value(Arg,Data)}]
        end;
    _ -> 
        [{Arg,{error,notjson}}]
  end,
  [list_to_binary(mochijson2:encode(Val))]. 

mapfieldwithid(Obj,_Kd,Arg) ->
  Val = case catch mochijson2:decode(hd(riak_object:get_values(Obj))) of
    {struct, Data} ->
        case Arg =:= null of
          true -> Data;
          false -> [{Arg,proplists:get_value(Arg,Data)}]
        end;
    _ -> 
        [{Arg,{error,notjson}}]
  end,
  V = [{bucket,riak_object:bucket(Obj)},{key,riak_object:key(Obj)}|Val],
  [list_to_binary(mochijson2:encode(V))].

reducecount(L,_Arg) ->
  [lists:sum([ N || N <- L, is_integer(N) ])].

sortfun(F) ->
  fun(A,B) ->
    proplists:get_value(F,A,<<"zzzz">>) =< proplists:get_value(F,B,<<"zzzz">>)
  end. 

reducepropsort(L,Arg) ->
  Decoded = [ I || {struct,I} <- [ mochijson2:decode(E) || E <- L], is_list(I)],
  Sorted = lists:sort(sortfun(Arg), Decoded),
  [ list_to_binary(mochijson2:encode(I)) || I <- Sorted ].

reducesort(L,_Arg) ->
  lists:sort(L).

reducedoublesort(L,Arg) ->
  Decoded = [ lists:sort(I) || {struct,I} <- [ mochijson2:decode(E) || E <- L], is_list(I)],
  Sorted = lists:sort(sortfun(Arg), Decoded),
  [ list_to_binary(mochijson2:encode(I)) || I <- Sorted ].

Python代码

import riak

client = riak.RiakClient(pb_port=8087, host="172.31.0.1", protocol='pbc')

test_bucket = client.bucket('test_bucket')

data1 = test_bucket.new('key1',data={'field1':'1data1','field2':'1data2','field3':1, 'zone':'D'})
data1.store()

data2 = test_bucket.new('key2',data={'field1':'2data1','field2':'2data2','field3':2, 'zone':'A'})
data2.store()

data3 = test_bucket.new('key3',data={'field1':'3data1','field2':'3data2','field3':3, 'zone':'C'})
data3.store()


def printresult(q):
  for result in q.run():
    print "%s" % (result)

print "\nCount the number of values in the bucket"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapcount'])
query.reduce(['custom_mr','reducecount'])
printresult(query)

print "\nList all values in natual sort order"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapvalue'])
query.reduce(['custom_mr','reducesort'])
printresult(query)

print "\nList all values sorted by 'zone'"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapfield'])
query.reduce(['custom_mr','reducepropsort'],{'arg':'zone'})
printresult(query)

print "\nList all values sorted by 'zone', also sort the fields in each object"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapfield'])
query.reduce(['custom_mr','reducedoublesort'],{'arg':'zone'})
printresult(query)

print "\nList just field3, sorted"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapfield'],{'arg':'field3'})
query.reduce(['custom_mr','reducepropsort'],{'arg':'field3'})
printresult(query)

print "\nList just bucket,key,field3, sorted by field3"
query = riak.RiakMapReduce(client).add('test_bucket')
query.map(['custom_mr','mapfieldwithid'],{'arg':'field3'})
query.reduce(['custom_mr','reducepropsort'],{'arg':'field3'})
printresult(query)

print "\nReturn just the zone for key2"
query = riak.RiakMapReduce(client).add('test_bucket','key2')
query.map(['custom_mr','mapfield'],{'arg':'zone'})
printresult(query)

print "\nReturn the bucket,key,zone for key1 and key3"
query = riak.RiakMapReduce(client).add('test_bucket',['key1','key3'])
query.map(['custom_mr','mapfieldwithid'],{'arg':'zone'})
query.reduce(['custom_mr','reducepropsort'],{'arg':'zone'})
printresult(query)

请注意:这些示例中的许多示例都使用了完整的 MapReduce,这将非常繁重,并且如果在大量数据上使用可能会影响性能。最后两个示例显示了如何选择特定键或键列表作为输入。如果集群设置了二级索引或 Riak 搜索,也可以用作输入,请参阅文档中的riak-python-client 查询输入

和输出:

# python ~/test.py

Count the number of values in the bucket
3

List all values in natual sort order
{"field2": "1data2", "field3": 1, "field1": "1data1", "zone": "D"}
{"field2": "2data2", "field3": 2, "field1": "2data1", "zone": "A"}
{"field2": "3data2", "field3": 3, "field1": "3data1", "zone": "C"}

List all values sorted by 'zone'
{"field2":"2data2","field3":2,"field1":"2data1","zone":"A"}
{"field2":"3data2","field3":3,"field1":"3data1","zone":"C"}
{"field2":"1data2","field3":1,"field1":"1data1","zone":"D"}

List all values sorted by 'zone', also sort the fields in each object
{"field1":"2data1","field2":"2data2","field3":2,"zone":"A"}
{"field1":"3data1","field2":"3data2","field3":3,"zone":"C"}
{"field1":"1data1","field2":"1data2","field3":1,"zone":"D"}

List just field3, sorted
{"field3":1}
{"field3":2}
{"field3":3}

List just bucket,key,field3, sorted by field3
{"bucket":"test_bucket","key":"key1","field3":1}
{"bucket":"test_bucket","key":"key2","field3":2}
{"bucket":"test_bucket","key":"key3","field3":3}

Return just the zone for key2
{"zone":"A"}

Return the bucket,key,zone for key1 and key3
{"bucket":"test_bucket","key":"key3","zone":"C"}
{"bucket":"test_bucket","key":"key1","zone":"D"}
于 2014-05-27T17:39:40.840 回答