0

我正在使用 Python 3 在 PySpark 中使用 elasticsearch-py 客户端,并且在使用 ES 和 RDD 的 analyze() 函数时遇到了问题。特别是,我的 RDD 中的每条记录都是一个文本字符串,我正在尝试分析它以获取令牌信息,但是在 Spark 中的 map 函数中尝试使用它时出现错误。

例如,这工作得很好:

from elasticsearch import Elasticsearch
es = Elasticsearch()
t = 'the quick brown fox'
es.indices.analyze(text=t)['tokens'][0]

{'end_offset': 3,
'position': 1,
'start_offset': 0,
'token': 'the',
'type': '<ALPHANUM>'}

但是,当我尝试这个时:

trdd = sc.parallelize(['the quick brown fox'])
trdd.map(lambda x: es.indices.analyze(text=x)['tokens'][0]).collect()

我收到一条与酸洗相关的非常长的错误消息(到此结束):

(self, obj)    109if'recursion'in.[0]:    110="""Could not pickle object as excessively deep recursion required."""--> 111                  picklePicklingErrormsg

  save_memoryviewself obj

: Could not pickle object as excessively deep recursion required.

raise.()    112    113def(,):PicklingError

我不确定错误是什么意思。难道我做错了什么?有没有办法将 ES 分析函数映射到 RDD 的记录上?

编辑:从 elasticsearch-py 应用其他函数时,我也遇到了这种行为(例如,es.termvector())。

4

1 回答 1

1

本质上,Elasticsearch客户端是不可序列化的。所以你需要做的是为每个分区创建一个客户端实例,并处理它们:

def get_tokens(part): es = Elasticsearch() yield [es.indices.analyze(text=x)['tokens'][0] for x in part] rdd = sc.parallelize([['the quick brown fox'], ['brown quick dog']], numSlices=2) rdd.mapPartitions(lambda p: get_tokens(p)).collect()

应该给出以下结果: Out[17]: [[{u'end_offset': 3, u'position': 1, u'start_offset': 0, u'token': u'the', u'type': u'<ALPHANUM>'}], [{u'end_offset': 5, u'position': 1, u'start_offset': 0, u'token': u'brown', u'type': u'<ALPHANUM>'}]]

请注意,对于大型数据集,这将非常低效,因为它涉及对数据集中每个元素的 ES REST 调用。

于 2015-08-24T10:38:37.150 回答