我正在使用 Scala 的 Disruptor,有一个奇怪的问题:链中的最后一个处理程序(无论它是哪个处理程序或链有多长)没有被调用。这就是我正在做的事情:
val handlers = Seq(handler1, handler2, handler3)
val firstHandler = input.disruptor.handleEventsWith(handlers.head)
handlers.tail.foldLeft[EventHandlerGroup[IntMessage]](firstHandler) {
(prev,handler) =>
prev.then(handler)
}
在调试器中,我可以看到最后一个处理程序已添加到消费者存储库,但不知何故从未调用 onEvent。我正在使用 CachedThreadPool 执行器。
我怎样才能使这项工作?