我正在运行一个 Spark 作业,并且在某个时候我想连接到弹性搜索服务器以获取一些数据并将它们添加到 RDD。所以我使用的代码看起来像这样
input.mapParitions(records=>{
val elcon=new ElasticSearchConnection
val client:TransportClient=elcon.openConnection()
val newRecs=records.flatMap(record=>{
val response = client.prepareGet("index" "indexType",
record.id.toString).execute().actionGet()
val newRec=processRec(record,reponse)
newRec
})//end of flatMap
client.close()
newRecs
})//end of mapPartitions
我的问题是client.close()
在操作完成之前调用了该命令,flatMap
这当然会导致异常。如果我在 flatMap 中移动连接的生成和关闭,则代码可以工作,但这会产生大量的连接。是否可以确保client.close
在 flatMap 操作完成后调用它?