当 Spark 作业构建的索引可能会从作业本身和其他来源接收相同文档 ID 的更新时,我正在并发环境中工作。假设来自其他来源的更新更新鲜,Spark 作业需要默默地忽略已经存在的文档,创建所有其他文档。这非常接近于使用 op_type: create 进行索引,但后者会引发未传递给我的错误处理程序的异常。以下代码块:
.rdd
.repartition(getTasks(configurationManager))
.saveJsonToEs(
s"$indexName/_doc",
Map(
"es.mapping.id" -> MenuItemDocument.ID_FIELD,
"es.write.operation" -> "create",
"es.write.rest.error.handler.bulkErrorHandler" ->
"<some package>.IgnoreExistsBulkWriteErrorHandler",
"es.write.rest.error.handlers" -> "bulkErrorHandler"
)
)
错误处理程序在几个变化中幸存下来,但目前是:
class IgnoreExistsBulkWriteErrorHandler extends BulkWriteErrorHandler with LazyLogging {
override def onError(entry: BulkWriteFailure, collector: DelayableErrorCollector[Array[Byte]]): HandlerResult = {
logger.info("Encountered exception:", entry.getException)
if (entry.getException.getMessage.contains("version_conflict_engine_exception")) {
logger.info("Encountered document already present in index, skipping")
HandlerResult.HANDLED
} else {
HandlerResult.ABORT
}
}
}
(我显然是首先在 getException().getCause() 中检查 org.elasticsearch.index.engine.VersionConflictEngineException ,但它没有用)
在日志中发出这个:
org.elasticsearch.hadoop.EsHadoopException: Could not write all entries for bulk operation [186/1000]. Error sample (first [5] error messages):
org.elasticsearch.hadoop.rest.EsHadoopRemoteException: version_conflict_engine_exception: [_doc][12]: version conflict, document already exists (current version [1])
(我假设我的错误处理程序根本没有被调用)
并终止了我的整个 Spark 工作。达到我想要的结果的正确方法是什么?