1

在一个简单java.util.List的输入中缓冲事件的自定义处理器process()- 此缓冲区不是状态存储。

每 30 秒 WALL_CLOCK_TIMEpunctuate()对该列表进行排序并刷新到接收器。假设只有单个分区源和接收器。需要EOS处理保证。

我知道在任何给定时间要么process()被执行要么punctuate()被执行。

我担心这个缓冲区不受变更日志主题的支持。理想情况下,我认为这应该是支持 EOS 的国有商店。

但是有一个论点是设置commit.interval为超过 30 秒 - 即 40 秒,将确保缓冲区中的事件永远不会丢失。而且由于我们使用WALL_CLOCK_TIMEpunctuate()是 ,无论我们是否有事件,都将始终每 30 秒调用一次。

这是一个有效的论点吗?这里有哪些情况会使缓冲区中的事件永远丢失?

@Override
public void init(ProcessorContext processorContext) {
    super.init(processorContext);
    this.buffer = new ArrayList<>();
    context().schedule(Duration.ofSeconds(20L), PunctuationType.WALL_CLOCK_TIME, this::flush);
}

void flush(long timestamp){
    LOG.info("Punctuator invoked.....");
    buffer.stream().sorted(Comparator.comparing(o -> o.getId())).forEach(
            i -> context().forward(i.getId(), i)
    );
}

@Override
public void process(String key, Customer value) {
    LOG.info("Processing {}", key);
    buffer.add(value);
}
4

1 回答 1

1

我有点想出一些反对调整提交和标点间隔的论点,并称这个设置是万无一失的。

来自文档,在 WALL_CLOCK_TIME

这只是最大的努力,因为它的粒度受处理循环的迭代完成所需的时间限制

如果出现以下情况,可能会“错过”标点符号:使用 PunctuationType#WALL_CLOCK_TIME,在 GC 暂停时,间隔太短

理想的 :

标点符号:|-------20s--------|--------20s-------|------20s-------| ------20s--------|

通讯它:|------------30s------------|------------30s------- ----|------------30s---

process()花了太多时间(比如 18 秒),所以punctuate()在第 40 秒第二次运行时没有调用 - 因为正如文档所述,间隔太短。

现在在第 31 秒,如果应用程序崩溃,即使启用了 eos,缓冲区中的事件也会在源处提交。重新启动时,缓冲区将丢失。

标点:|--------20s--------|------process()---------20s--------|---- --20s--------|

通讯它:|------------30s------------|------------30s------- ------|------------30s---

因此,调整提交和标点间隔会抑制对状态存储的需求的论点是无效的。

于 2020-07-01T12:39:32.997 回答