1

我在 flink 中通过查询 API 使用 ElasticSearch 更新,flink 并行度为 1。但是我得到了 version_conflict_engine_exception,这是我在 flink RichSinkFunction 中的代码,如下所示:

        UpdateByQueryRequestBuilder builder = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
        builder.abortOnVersionConflict(true);
        builder.source(indexName);
        builder.filter(filter);
        builder.setMaxRetries(MAX_RETRIES);
        builder.refresh(true);

        String updateTime = Instant.ofEpochMilli(ts).atZone(ZoneId.systemDefault())
                .format(ELASTIC_SEARCH_DATE_TIME_FORMATTER);

        Map<String, Object> params = Maps.newHashMap();
        params.put("fieldName", fieldName);
        params.put("updateTime", updateTime);
        params.put("model", this.transformMap(JacksonUtils.convertValue(model, new TypeReference<Map<String, Object>>() {
        })));

        builder.script(new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, UPDATE_BY_MODEL_PAINLESS_CODE, params));
        BulkByScrollResponse response = builder.get();

我可以肯定的是,只有这个应用程序访问 Elasticsearch,flink 并行度是 1,就像在单线程调用查询 API 更新中一样?为什么我得到了 version_conflict_engine_exception?以及如何只做一次?

4

1 回答 1

0

我看到两种可能性:

  1. 正在运行其他可以更新文档的东西。
  2. Flink 的 elasticsearch sink 提供 at-least-once 保证,这意味着如果发生故障,sink 有时会在恢复期间执行重复写入。也许这会导致尝试使用过期版本号更新文档。
于 2020-01-17T12:28:56.797 回答