0

我的最终目标是使用 PySpark 有效地索引 Elasticsearch (ES) 中的大量数据,然后针对索引运行大量查询并记录结果的统计信息。

Elasticsearch version 5.6.5
Spark version 2.4.0
Hadoop version 2.7
Elasticsearch-Hadoop python library version: 6.6.0

考虑以下代码:

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext

# create our Spark Context  
sc_conf = SparkConf().setAll((
    ("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
))
sc_conf.setAppName("PythonSparkStreaming")

sc = SparkContext(conf=sc_conf)

sqlContext = SQLContext(sc)

q ="""{
  "query": {
    "match_all": {}
  }  
}"""

es_live_conf["es.query"] = q

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_live_conf)

sqlContext.createDataFrame(es_rdd).limit(1).collect()

我只是想对索引运行 match all 查询,我只想要最高的结果。我尝试在 ES 查询中表达限制,但显然 Spark 忽略了这一点,所以我用数据框过滤器来表达它。

我已将 Spark 配置如下:

es_live_conf = {

# specify the node that we are sending data to (this should be the master)
"es.nodes" : 'xxx.xxxx.com',

# specify the port in case it is not the default port
"es.port" : ES_PORT,

# specify a resource in the form 'index/doc-type'
"es.resource" : 'xxxxxx/document',

"es.net.http.auth.user" : ES_USERNAME,

"es.net.http.auth.pass" : ES_PASSWORD,

"es.net.ssl":"true",

"es.net.ssl.cert.allow.self.signed": "true",

"es.nodes.discovery": "false",

"es.nodes.wan.only": "true",

"es.index.read.missing.as.empty": "true",

}

我正在访问 VPC 后面的 ES 集群,因此我只能访问客户端节点,而不能访问内部数据等节点。这就是wan.only设置为 true 的原因。

通过这种设置,Spark 似乎会以完全匹配的方式查询每个节点,然后最终合并为我真正想要的单个结果。它非常慢(50 个分片,3000 万个文档),它完全避免了 ES 有效减少每个节点本身的结果的能力。即使我将查询更改为通过单个文档 ID 专门搜索,它也会通过主节点通过在每次调用中指定特定的分片 ID 来针对每个单独的分片运行查询。我尝试将 设置es.nodes.client.only为 true,但这抱怨设置与wan.only. 如果启用client.only和禁用,wan.only我将无法再连接到集群,因为它会尝试直接连接无法访问的每个节点。

我在这里做错了什么?如何使用 PySpark 对 ES 运行一次查询,而不是为每个分片运行一次。此外,如果 PySpark 尝试在每个分片上运行完整查询然后看似 post 处理结果,我该如何使用from,size和我的查询之类的东西?rescore

4

1 回答 1

0

我找不到使用 ES Hadoop 库解决此问题的方法。当您需要对单个 Elasticsearch 查询返回的结果运行一个非常长、非常复杂的 reduce 步骤时,它似乎更适合使用 Spark,而不是运行数百万个快速 ES 查询并聚合结果。为了解决这个问题,我最终使用了这个插件:https ://github.com/sourav-mazumder/Data-Science-Extensions/tree/master/spark-datasource-rest

我实际上进一步开发了它,以便每个内核可以使用多个线程来并行执行更多请求。它不仅可以让您对 ES 集群进行 DDOS,还可以为您可能需要消耗和聚合大量请求的任何平台提供任何静态端点。

如果我能搞清楚,我也会在 github 上发布我公开创建的多线程版本。

于 2019-07-11T18:21:50.190 回答