2

我正在运行一个 Spark 作业,并且在某个时候我想连接到弹性搜索服务器以获取一些数据并将它们添加到 RDD。所以我使用的代码看起来像这样

 input.mapParitions(records=>{
  val elcon=new ElasticSearchConnection
  val client:TransportClient=elcon.openConnection()
 val newRecs=records.flatMap(record=>{
      val response = client.prepareGet("index" "indexType",
      record.id.toString).execute().actionGet()
       val newRec=processRec(record,reponse)
       newRec
   })//end of flatMap
   client.close()
   newRecs
 })//end of mapPartitions

我的问题是client.close()在操作完成之前调用了该命令,flatMap这当然会导致异常。如果我在 flatMap 中移动连接的生成和关闭,则代码可以工作,但这会产生大量的连接。是否可以确保client.close在 flatMap 操作完成后调用它?

4

1 回答 1

0

对 RDD 中的每个项目进行阻塞调用以获取相应的 ElasticSearch 文档会导致问题。通常建议避免阻塞呼叫。

还有另一种使用ElasticSearch-for-Hadoop 的 Spark 支持的替代方法。

将 ElasticSearch 索引/类型作为另一个 RDD 读取,并将其与您的 RDD 连接。

包括正确版本的ESHadoop 依赖项

import org.elasticsearch.spark._
val esRdd = sc.esRDD("index/indexType")   //This returns a pair RDD of (_id, Map of all key value pairs for all fields]
input.map(record => (record.id, record))  //Convert your RDD of records to a pair rdd of (id, record) as we want to join based on the id
input.join(esRdd).map(rec => processResponse(rec._2._1, rec._2._2)) // Join the two RDDs based on id column it returns a pair RDD with key=id & value=Pair of matching records (id,(inputrddrecord,esrddrecord))

希望这可以帮助。

PS:依然无法缓解缺乏co-location的问题。(即每个带有_id 的文档将来自索引的不同分片)。更好的方法是在创建 ES 索引时实现输入 RDD 和 ES 索引文档的协同定位。

于 2016-05-06T13:31:34.150 回答