我发现github上Lmax Disrupter的用户指南很简单,现在我有一个生产者和五个消费者的问题,之后我需要总结消费者的结果,有没有演示,如何找到Lmax Disruptor diamond (一个生产者5个消费者1个结论)例子?
非常感谢!
我发现github上Lmax Disrupter的用户指南很简单,现在我有一个生产者和五个消费者的问题,之后我需要总结消费者的结果,有没有演示,如何找到Lmax Disruptor diamond (一个生产者5个消费者1个结论)例子?
非常感谢!
您可以通过 varags 向Disruptor.handleEventsWith
. 然后通过调用then
(fluent DSL)注册结论。第二个调用确保事件在传递到结束步骤之前由所有消费者处理。
一个工作示例可能如下所示:
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.*;
import java.util.concurrent.*;
public class Diamond {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, 1024, executor, ProducerType.SINGLE, new SleepingWaitStrategy());
//register five consumers and a final conclude
disruptor.handleEventsWith(new Consumer(1), new Consumer(2), new Consumer(3), new Consumer(4), new Consumer(5)).then(new Conclude());
disruptor.start();
for (int i = 0; i < 3; i++) {
disruptor.publishEvent((event, sequence, newValue) -> event.setValue(newValue), i);
}
disruptor.shutdown();
executor.shutdown();
}
public static class Consumer implements EventHandler<LongEvent> {
private int i;
public Consumer(int i) { this.i = i; }
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("Consumer: " + i);
event.setValue(event.getValue() + 1);
}
}
public static class Conclude implements EventHandler<LongEvent> {
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("Conclude: " + event.getValue());
}
}
public static class LongEvent
{
private long value;
public void setValue(long value)
{
this.value = value;
}
public long getValue() {
return this.value;
}
}
}
事件只包含一个长值。消费者增加值,最后一步打印它。该for
循环将初始值为 1、2 和 3 的三个事件放入环中。
请注意,您不需要同步 中的工作,LongEvent
因为Consumer
环形缓冲区可确保一次只有一个处理程序在处理事件。此外请注意消费者的印刷品在几次运行中是如何变化的。