我正在使用以下代码在 Spark 数据帧和 Elasticsearch 之间写入/读取数据:
df.write.format("org.elasticsearch.spark.sql")
.option("es.nodes" , [MY_ES_IP])
.option("es.port",[MY_ES_PORT])
...
.option("es.index.auto.create","true")
.option("es.resouce.auto.create",[INDEX]/[DATA])
.save([INDEX]/[DATA])
和
val df = spark.read.format("org.elasticsearch.spark.sql")
.option("es.nodes" , [MY_ES_IP])
.option("es.port",[MY_ES_PORT])
...
.load([INDEX]/[DATA])
我想检索对我的数据的重要条款请求的结果,但我找不到任何关于如何使用 Spark 实现此目的的示例。
我想做的 DSL 请求:
{
"query" : {
"terms" : {[BUCKET REQUEST]}
},
"aggregations" : {
"significant_elements" : {
"significant_terms" : { "field" : [FIELD NAME] }
}
}
}
有没有办法只使用 org.elasticsearch.spark.sql 库来实现这一点?
编辑 :
我尝试通过以下方式解决问题:
val myquery = "{\"query\" : {\"terms\" : [BUCKET REQUEST]},\"aggregations\" : {\"significant_elements\" : {\"significant_terms\" : { \"field\" : [FIELD NAME]}}}}"
val df = spark.read.format("org.elasticsearch.spark.sql")
.option("es.nodes" , [MY_ES_IP])
.option("es.port",[MY_ES_PORT])
...
.option("query", myquery)
.option("pushdown", "true")
.load([INDEX]/[DATA])
但是我在结果中得到的数据框只是存储桶请求的结果。我仍在寻找如何获得每个 [FIELD NAME] 的“显着分数”