我的项目中有一个环形缓冲区,许多发布者将在其中发布事件(例如 500 个发布者),并且我有 3 个 EventProcessor 应该按顺序处理事件。所有事件都应该以这种方式传递:
{很多发布者} -> {UpStreamProcessor} -> {DownStreamProcessor} -> {logProcessor}
问题是我在 UpStreamProcessor 的发布和启动之间以及 UpStreamProcessor 的结束到 DownStreamProcessor 的启动之间传递事件时浪费了很多时间。
例如,当我有 500 个发布者时,UpStreamProcessor 和 DownStreamProcessor 的处理平均持续 1ms,而 UpStreamProcessor 完成时间到 DownStreamProcessor 开始时间之间持续 400ms。
这是构建环形缓冲区和处理器的代码:
SequenceBarrier sequenceBarrier;
receiveBuffer = new RingBuffer<>(
MessageContext.FACTORY,
new MultiThreadedLowContentionClaimStrategy(inputBufferSize),
new YieldingWaitStrategy()
);
upStreamAgentProcessor = new BatchEventProcessor<>(
receiveBuffer,
receiveBuffer.newBarrier(),
new UpStreamAgent()
);
sequenceBarrier = receiveBuffer.newBarrier(
upStreamAgentProcessor.getSequence()
);
downStreamAgentProcessor = new BatchEventProcessor<MessageContext>(
receiveBuffer,
sequenceBarrier,
new DownStreamAgent()
);
sequenceBarrier = receiveBuffer.newBarrier(
downStreamAgentProcessor.getSequence()
);
logMapAgentProcessor = new BatchEventProcessor<MessageContext>(
receiveBuffer,
sequenceBarrier,
LogMap.getInstance()
);
receiveBuffer.setGatingSequences(logMapAgentProcessor.getSequence());
operationalExecutor.submit(upStreamAgentProcessor);
operationalExecutor.submit(downStreamAgentProcessor);
operationalExecutor.submit(logMapAgentProcessor);