2

我对破坏者有以下问题:

  1. 消费者(事件处理器)没有实现他们实现 EventHandler 的任何 Callable 或 Runnable 接口,那么它们如何并行运行,所以例如我有一个中断器实现,其中有一个像这样的菱形图案
     c1
P1 - c2 - c4 - c5
     c3

其中c1到c3可以在p1之后并行工作,C4和C5在它们之后工作。

所以通常我会有这样的东西(P1和C1-C5是runnables/callables)

p1.start();
p1.join();

c1.start();
c2.start();
c3.start();
c1.join();
c2.join();
c3.join();

c4.start();
c4.join();
c5.start();
c5.join();

但是在 Disruptor 的情况下,我的事件处理程序都没有实现 Runnable 或 Callable,那么中断器框架最终是如何并行运行它们的呢?

采取以下场景:

我的消费者 C2 需要对事件进行一些注释的 Web 服务调用,在 SEDA 中,我可以为这样的 10 个 C2 请求启动 10 个线程 [用于将消息拉出队列 + 进行 Web 服务调用并更新下一个 SEDA 队列],这将确保我不会按顺序等待 10 个请求中的每一个的 Web 服务响应,在这种情况下,我的事件处理器 C2(如果)是单个实例,将按顺序等待 10 个 C2 请求。

4

2 回答 2

5

您的 EventHandler 被组合成一个 BatchEventProcessor 的实例,它是一个 Runnable。

  • BatchEventProcessor 实现了 run() 循环。
  • 它负责从环形缓冲区获取更新的事件序列号。
  • 当事件可用于处理时,它会将它们传递给您的 EventHandler 进行处理。

使用 DSL 时,Disruptor 负责通过 Executor 实例创建这些线程。

  • 您在 Disruptor 构造函数中提供了一个 Executor。
  • 您在 DSL 上使用 and/then/etc 提供您的 EventHandlers 列表。
  • 然后,调用 Disruptor start() 方法。作为 start() 方法的一部分,每个 EventProcessor (Runnable) 都被提交给 Executor。
  • Executor 将依次在线程上启动您的 EventProcessor/EventHandler。

关于您的特定场景(即:长时间运行的事件处理程序),您可能会参考这个问题:

于 2013-06-10T12:21:04.917 回答
0

Disruptor 默认策略是多线程的,所以如果你的每个处理器都在不同的 Handler(消费者)中工作,那么应该没问题,并且你的处理器是多线程的。

于 2014-06-06T13:53:04.237 回答