我正在使用 Elasticsearch-Hadoop 连接器将 DStream 写入 Elasticsearch。这是您可以找到连接器的链接 https://www.elastic.co/guide/en/elasticsearch/hadoop/5.6/spark.html
我需要处理窗口,使用“JavaEsSpark.saveToEs”方法将所有文档写入 ES,并希望确保所有文档写入并将偏移量提交到 Kafka。由于 JavaEsSpark.saveToEs 以批处理模式插入文档,因此我无法跟踪我的文档。
我的基本代码如下。有意见吗?
dstream.foreachRDD((items, time) -> {
JavaEsSpark.saveToEs(items,"myindex/mytype");
//wait until all the documents written
//do somehing else then return (actually the job is committing kafka offsets)
});