1

目标

我正在尝试在处理程序之间创建一种有点循环的依赖关系,但我无法弄清楚如何正确处理它。我想要实现的是producer -> [handlers 1-3] -> handler 4.

所以,disruptor.handleEventsWith(h1, h2, h3).then(h4);。但我有额外的要求

  1. 虽然处理程序 1-3 确实并行处理消息,但它们都不会开始处理下一条消息,直到它们都完成了前一条消息。
  2. 在第一条消息之后,处理程序 1-3 等待处理程序 4 在处理下一条消息之前完成最近的消息。

使用单个事件处理程序的等效执行逻辑可以是:

disruptor.handleEventsWith((event, sequence, endOfBatch) -> {
  Arrays.asList(h1, h2, h3).parallelStream()
        .forEach(h -> h.onEvent(event, sequence, endOfBatch));
  h4.onEvent(event, sequence, endOfBatch);
});

语境

设计上下文是处理程序 1-3 各自根据消息更新自己的状态,并且在三个处理程序中的每一个处理消息后,它们处于一致状态。处理程序 4 然后根据处理程序 1-3 更新的状态运行一些逻辑。所以处理程序 4 应该只看到由 1-3 维护的数据结构的一致状态,这意味着处理程序 1-3 不应该在处理程序 4 完成之前处理下一条消息。

(虽然目标肯定是使用 Disruptor 来管理并发,而不是java.util.Stream。)

不确定它是否重要,但处理程序 4 的逻辑也可以分为两部分,一个要求不更新处理程序 1-3,下一个只要求处理程序 4 的第一部分已完成。因此,处理程序 1-3 可以在处理程序 4 的第二部分仍在执行时处理消息。

有没有办法做到这一点?或者我的设计有缺陷?我觉得应该有一种方法可以通过SequenceBarrier但我不太明白如何实现这个自定义屏障。对于处理程序 1-3,我想我想用逻辑做一个障碍handlers[1:3].lastProcessedSequence() == handlers[4].lastProcessedSequence(),但我不知道该把逻辑放在哪里。

谢谢!

4

1 回答 1

1

我会考虑让处理程序是无状态的,并使用它们处理的消息来包含系统的状态。这样你就根本不需要同步你的处理程序。

于 2017-08-25T18:10:07.880 回答