1

我正在尝试通过 spark 执行此 elasticsearch 查询:

    POST /aa6/_mtermvectors  
    {
      "ids": [
        "ABC",
        "XYA",
        "RTE"
      ],
      "parameters": {
        "fields": [
          "attribute"
        ],
        "term_statistics": true,
        "offsets": false,
        "payloads": false,
        "positions": false
      }
    }

我在 Zeppelin 中编写的代码是:

def createString():String = {
    return s"""_mtermvectors {
  "ids": [
    "ABC",
    "XYA",
    "RTE"
  ],
  "parameters": {
    "fields": [
      "attribute"
    ],
    "term_statistics": true,
    "offsets": false,
    "payloads": false,
    "positions": false
    }
  }"""
}

import org.elasticsearch.spark._
sc.esRDD("aa6", "?q="+createString).count   

我得到错误:

org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest:org.elasticsearch.hadoop.rest.EsHadoopRemoteException:parse_exception:parse_exception:在第 1 行第 22 列遇到“<RANGE_GOOP>”[“RTE”“XYA”“ABC”“” . 期待:“TO” ...

{"query":{"query_string":{"query":"_mtermvectors {\"ids\": [\"RTE\",\"ABC\",\"XYA\"], \"parameters\": {\"fields\": [\"attribute\"], \"term_statistics\": true, \"offsets\": false, \"payloads\": false, \"positions\": false } }"}}}
    at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:477)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:428)
    at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:408)

这可能很简单,但我无法在进行 spark 调用时找到设置请求正文的方法

4

1 回答 1

0

我不确定,但我认为 package.json 目前不支持此功能es-Spark。您可以通过ofthis link查看哪些选项可用。sparkContextesRDD

您可以做的是利用 Elasticsearch 的高级 Rest Client并在 List 或 Seq 或任何文件中获取详细信息,然后将其加载到 Spark RDD 中。

这是环游世界的方式,但不幸的是,这是我认为的唯一方式。为了有所帮助,我创建了以下代码段,因此您至少拥有来自 Elasticsearch 的与上述查询相关的所需数据。

import org.apache.http.HttpHost
import org.elasticsearch.client.RequestOptions
import org.elasticsearch.client.RestClient
import org.elasticsearch.client.RestHighLevelClient
import org.elasticsearch.client.core.MultiTermVectorsRequest
import org.elasticsearch.client.core.TermVectorsRequest
import org.elasticsearch.client.core.TermVectorsResponse

object SampleSparkES {
  
  /**
   * Main Class where program starts
   */
  def main(args: Array[String]) = {

     val termVectorsResponse = elasticRestClient
  
     println(termVectorsResponse.size)
  
   }
  
  /**
   * Scala client code to retrieve the response of mtermVectors 
   */
  def elasticRestClient : java.util.List[TermVectorsResponse] = {
    
    val client = new RestHighLevelClient(
                        RestClient.builder(
                            new HttpHost("localhost", 9200, "http")))    
    
    val tvRequestTemplate = new TermVectorsRequest("aa6","ids"); 
    tvRequestTemplate.setFields("attribute");
    
    //Set the document ids you want for collecting the term Vector information
    val ids = Array("1", "2", "3");
    val request = new MultiTermVectorsRequest(ids, tvRequestTemplate); 
    val response = client.mtermvectors(request, RequestOptions.DEFAULT)
    
    //get the response
    val termVectorsResponse = response.getTermVectorsResponses   
    
    //close RestHighLevelClient
    client.close();    
    
    //return  List[TermVectorsResponse]
    termVectorsResponse
  }
}

例如,您可以通过以下方式获取第一个文档的 sumDocFreq

println(termVectorsResponse.iterator.next.getTermVectorsList.iterator.next.getFieldStatistics.getSumDocFreq)

您现在需要做的就是找到一种方法,以一种可以加载到 RDD 中的方式将集合转换为 Seq。

于 2020-07-12T14:15:03.997 回答