0

我正在使用 Elasticsearch-Hadoop 连接器将 DStream 写入 Elasticsearch。这是您可以找到连接器的链接 https://www.elastic.co/guide/en/elasticsearch/hadoop/5.6/spark.html

我需要处理窗口,使用“JavaEsSpark.saveToEs”方法将所有文档写入 ES,并希望确保所有文档写入并将偏移量提交到 Kafka。由于 JavaEsSpark.saveToEs 以批处理模式插入文档,因此我无法跟踪我的文档。

我的基本代码如下。有意见吗?

    dstream.foreachRDD((items, time) -> {
        JavaEsSpark.saveToEs(items,"myindex/mytype");
        //wait until all the documents written
        //do somehing else then return (actually the job is committing kafka offsets)
});
4

1 回答 1

0

您可以将函数封装在 Try 中(这是一个 Scala 示例):

 Try {
  rdd.saveToEs(AppSettings.Elastic.Resource, configuration)
} match {
  case Failure(f) =>
    logger.error(s"SaveToEs failed: $f") //or whatever you want
  case _ =>
}
于 2020-03-23T10:00:21.427 回答