0

当 Spark 作业构建的索引可能会从作业本身和其他来源接收相同文档 ID 的更新时,我正在并发环境中工作。假设来自其他来源的更新更新鲜,Spark 作业需要默默地忽略已经存在的文档,创建所有其他文档。这非常接近于使用 op_type: create 进行索引,但后者会引发未传递给我的错误处理程序的异常。以下代码块:

          .rdd
          .repartition(getTasks(configurationManager))
          .saveJsonToEs(
            s"$indexName/_doc",
            Map(
              "es.mapping.id" -> MenuItemDocument.ID_FIELD,
              "es.write.operation" -> "create",
              "es.write.rest.error.handler.bulkErrorHandler" ->
                "<some package>.IgnoreExistsBulkWriteErrorHandler",
              "es.write.rest.error.handlers" -> "bulkErrorHandler"
            )
          )

错误处理程序在几个变化中幸存下来,但目前是:

class IgnoreExistsBulkWriteErrorHandler extends BulkWriteErrorHandler with LazyLogging {
  override def onError(entry: BulkWriteFailure, collector: DelayableErrorCollector[Array[Byte]]): HandlerResult = {
    logger.info("Encountered exception:", entry.getException)
    if (entry.getException.getMessage.contains("version_conflict_engine_exception")) {
      logger.info("Encountered document already present in index, skipping")
      HandlerResult.HANDLED
    } else {
      HandlerResult.ABORT
    }
  }
}

(我显然是首先在 getException().getCause() 中检查 org.elasticsearch.index.engine.VersionConflictEngineException ,但它没有用)

在日志中发出这个:

org.elasticsearch.hadoop.EsHadoopException: Could not write all entries for bulk operation [186/1000]. Error sample (first [5] error messages):
    org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [_doc][12]: version conflict, document already exists (current version [1])

(我假设我的错误处理程序根本没有被调用)

并终止了我的整个 Spark 工作。达到我想要的结果的正确方法是什么?

4

0 回答 0