2

我正在使用以下代码在 Spark 数据帧和 Elasticsearch 之间写入/读取数据:

df.write.format("org.elasticsearch.spark.sql")
.option("es.nodes" , [MY_ES_IP])
.option("es.port",[MY_ES_PORT])

   ...

.option("es.index.auto.create","true")
.option("es.resouce.auto.create",[INDEX]/[DATA])
.save([INDEX]/[DATA])

val df = spark.read.format("org.elasticsearch.spark.sql")
     .option("es.nodes" , [MY_ES_IP])
     .option("es.port",[MY_ES_PORT])

     ...

     .load([INDEX]/[DATA])

我想检索对我的数据的重要条款请求的结果,但我找不到任何关于如何使用 Spark 实现此目的的示例。

我想做的 DSL 请求:

{
    "query" : {
        "terms" : {[BUCKET REQUEST]}
    },
    "aggregations" : {
        "significant_elements" : {
            "significant_terms" : { "field" : [FIELD NAME] }
        }
    }
}

有没有办法只使用 org.elasticsearch.spark.sql 库来实现这一点?

编辑 :

我尝试通过以下方式解决问题:

val myquery = "{\"query\" : {\"terms\" : [BUCKET REQUEST]},\"aggregations\" : {\"significant_elements\" : {\"significant_terms\" : { \"field\" : [FIELD NAME]}}}}"

val df = spark.read.format("org.elasticsearch.spark.sql")
                     .option("es.nodes" , [MY_ES_IP])
                     .option("es.port",[MY_ES_PORT])

                     ...

                     .option("query", myquery)
                     .option("pushdown", "true")
                     .load([INDEX]/[DATA])

但是我在结果中得到的数据框只是存储桶请求的结果。我仍在寻找如何获得每个 [FIELD NAME] 的“显着分数”

4

0 回答 0