在一个简单java.util.List
的输入中缓冲事件的自定义处理器process()
- 此缓冲区不是状态存储。
每 30 秒 WALL_CLOCK_TIMEpunctuate()
对该列表进行排序并刷新到接收器。假设只有单个分区源和接收器。需要EOS处理保证。
我知道在任何给定时间要么process()
被执行要么punctuate()
被执行。
我担心这个缓冲区不受变更日志主题的支持。理想情况下,我认为这应该是支持 EOS 的国有商店。
但是有一个论点是设置commit.interval
为超过 30 秒 - 即 40 秒,将确保缓冲区中的事件永远不会丢失。而且由于我们使用WALL_CLOCK_TIME
的punctuate()
是 ,无论我们是否有事件,都将始终每 30 秒调用一次。
这是一个有效的论点吗?这里有哪些情况会使缓冲区中的事件永远丢失?
@Override
public void init(ProcessorContext processorContext) {
super.init(processorContext);
this.buffer = new ArrayList<>();
context().schedule(Duration.ofSeconds(20L), PunctuationType.WALL_CLOCK_TIME, this::flush);
}
void flush(long timestamp){
LOG.info("Punctuator invoked.....");
buffer.stream().sorted(Comparator.comparing(o -> o.getId())).forEach(
i -> context().forward(i.getId(), i)
);
}
@Override
public void process(String key, Customer value) {
LOG.info("Processing {}", key);
buffer.add(value);
}