我的最终目标是使用 PySpark 有效地索引 Elasticsearch (ES) 中的大量数据,然后针对索引运行大量查询并记录结果的统计信息。
Elasticsearch version 5.6.5
Spark version 2.4.0
Hadoop version 2.7
Elasticsearch-Hadoop python library version: 6.6.0
考虑以下代码:
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
# create our Spark Context
sc_conf = SparkConf().setAll((
("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
))
sc_conf.setAppName("PythonSparkStreaming")
sc = SparkContext(conf=sc_conf)
sqlContext = SQLContext(sc)
q ="""{
"query": {
"match_all": {}
}
}"""
es_live_conf["es.query"] = q
es_rdd = sc.newAPIHadoopRDD(
inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_live_conf)
sqlContext.createDataFrame(es_rdd).limit(1).collect()
我只是想对索引运行 match all 查询,我只想要最高的结果。我尝试在 ES 查询中表达限制,但显然 Spark 忽略了这一点,所以我用数据框过滤器来表达它。
我已将 Spark 配置如下:
es_live_conf = {
# specify the node that we are sending data to (this should be the master)
"es.nodes" : 'xxx.xxxx.com',
# specify the port in case it is not the default port
"es.port" : ES_PORT,
# specify a resource in the form 'index/doc-type'
"es.resource" : 'xxxxxx/document',
"es.net.http.auth.user" : ES_USERNAME,
"es.net.http.auth.pass" : ES_PASSWORD,
"es.net.ssl":"true",
"es.net.ssl.cert.allow.self.signed": "true",
"es.nodes.discovery": "false",
"es.nodes.wan.only": "true",
"es.index.read.missing.as.empty": "true",
}
我正在访问 VPC 后面的 ES 集群,因此我只能访问客户端节点,而不能访问内部数据等节点。这就是wan.only
设置为 true 的原因。
通过这种设置,Spark 似乎会以完全匹配的方式查询每个节点,然后最终合并为我真正想要的单个结果。它非常慢(50 个分片,3000 万个文档),它完全避免了 ES 有效减少每个节点本身的结果的能力。即使我将查询更改为通过单个文档 ID 专门搜索,它也会通过主节点通过在每次调用中指定特定的分片 ID 来针对每个单独的分片运行查询。我尝试将 设置es.nodes.client.only
为 true,但这抱怨设置与wan.only
. 如果启用client.only
和禁用,wan.only
我将无法再连接到集群,因为它会尝试直接连接无法访问的每个节点。
我在这里做错了什么?如何使用 PySpark 对 ES 运行一次查询,而不是为每个分片运行一次。此外,如果 PySpark 尝试在每个分片上运行完整查询然后看似 post 处理结果,我该如何使用from
,size
和我的查询之类的东西?rescore