目标
我正在尝试在处理程序之间创建一种有点循环的依赖关系,但我无法弄清楚如何正确处理它。我想要实现的是producer -> [handlers 1-3] -> handler 4
.
所以,disruptor.handleEventsWith(h1, h2, h3).then(h4);
。但我有额外的要求
- 虽然处理程序 1-3 确实并行处理消息,但它们都不会开始处理下一条消息,直到它们都完成了前一条消息。
- 在第一条消息之后,处理程序 1-3 等待处理程序 4 在处理下一条消息之前完成最近的消息。
使用单个事件处理程序的等效执行逻辑可以是:
disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
Arrays.asList(h1, h2, h3).parallelStream()
.forEach(h -> h.onEvent(event, sequence, endOfBatch));
h4.onEvent(event, sequence, endOfBatch);
});
语境
设计上下文是处理程序 1-3 各自根据消息更新自己的状态,并且在三个处理程序中的每一个处理消息后,它们处于一致状态。处理程序 4 然后根据处理程序 1-3 更新的状态运行一些逻辑。所以处理程序 4 应该只看到由 1-3 维护的数据结构的一致状态,这意味着处理程序 1-3 不应该在处理程序 4 完成之前处理下一条消息。
(虽然目标肯定是使用 Disruptor 来管理并发,而不是java.util.Stream
。)
不确定它是否重要,但处理程序 4 的逻辑也可以分为两部分,一个要求不更新处理程序 1-3,下一个只要求处理程序 4 的第一部分已完成。因此,处理程序 1-3 可以在处理程序 4 的第二部分仍在执行时处理消息。
有没有办法做到这一点?或者我的设计有缺陷?我觉得应该有一种方法可以通过SequenceBarrier
但我不太明白如何实现这个自定义屏障。对于处理程序 1-3,我想我想用逻辑做一个障碍handlers[1:3].lastProcessedSequence() == handlers[4].lastProcessedSequence()
,但我不知道该把逻辑放在哪里。
谢谢!