1

我的项目中有一个环形缓冲区,许多发布者将在其中发布事件(例如 500 个发布者),并且我有 3 个 EventProcessor 应该按顺序处理事件。所有事件都应该以这种方式传递:

{很多发布者} -> {UpStreamProcessor} -> {DownStreamProcessor} -> {logProcessor}

问题是我在 UpStreamProcessor 的发布和启动之间以及 UpStreamProcessor 的结束到 DownStreamProcessor 的启动之间传递事件时浪费了很多时间。

例如,当我有 500 个发布者时,UpStreamProcessor 和 DownStreamProcessor 的处理平均持续 1ms,而 UpStreamProcessor 完成时间到 DownStreamProcessor 开始时间之间持续 400ms。

这是构建环形缓冲区和处理器的代码:

SequenceBarrier sequenceBarrier;

receiveBuffer = new RingBuffer<>(
    MessageContext.FACTORY, 
    new MultiThreadedLowContentionClaimStrategy(inputBufferSize),
    new YieldingWaitStrategy()
);

upStreamAgentProcessor = new BatchEventProcessor<>(
    receiveBuffer,
    receiveBuffer.newBarrier(),
    new UpStreamAgent()
);
sequenceBarrier = receiveBuffer.newBarrier(
    upStreamAgentProcessor.getSequence()
);

downStreamAgentProcessor = new BatchEventProcessor<MessageContext>(
    receiveBuffer,
    sequenceBarrier,
    new DownStreamAgent()
);
sequenceBarrier = receiveBuffer.newBarrier(
    downStreamAgentProcessor.getSequence()
);

logMapAgentProcessor = new BatchEventProcessor<MessageContext>(
    receiveBuffer,
    sequenceBarrier,
    LogMap.getInstance()
);


receiveBuffer.setGatingSequences(logMapAgentProcessor.getSequence());

operationalExecutor.submit(upStreamAgentProcessor);
operationalExecutor.submit(downStreamAgentProcessor);
operationalExecutor.submit(logMapAgentProcessor);
4

1 回答 1

2

Disruptor 旨在处理需要 0.0001 毫秒的消息 如果 1 毫秒甚至 0.1 毫秒的延迟不打扰您,我会使用普通的 ExecutorService。如果您看到延迟或超过 0.001 毫秒,则不太可能是干扰因素,并且您正在执行的任务花费的时间太长。

这是关于协调遗漏的一个很好的介绍。http://www.infoq.com/presentations/latency-pitfalls坏消息是,如果你有一个瓶颈会减慢生产者的速度,那么延迟可能比你测量的要糟糕得多。

于 2013-10-30T10:10:20.743 回答