我最近被介绍到 LMAX Disruptor 并决定试一试。感谢开发人员,设置快速且轻松。但我认为如果有人可以帮助我解决这个问题,我会遇到问题。
问题: 有人告诉我,当生产者发布事件时,它应该阻塞,直到消费者有机会在环绕之前检索它。我在消费者端有一个序列屏障,我可以确认如果生产者没有发布数据,消费者的 waitFor 调用将阻塞。但是,生产者似乎不受任何监管,只会环绕并覆盖环形缓冲区中未处理的数据。
我有一个生产者作为在单独线程上运行的可运行对象。
public class Producer implements Runnable {
private final RingBuffer<Event> ringbuffer;
public Producer(RingBuffer<Event> rb) {
ringbuffer = rb;
}
public void run() {
long next = 0L;
while(true) {
try {
next = ringbuffer.next();
Event e = ringbuffer.get(next);
... do stuff...
e.set(... stuff...);
}
finally {
ringbuffer.publish(next);
}
}
}
}
我有一个消费者在主线程上运行。
public class Consumer {
private final ExecutorService exec;
private final Disruptor<Event> disruptor;
private final RingBuffer<Event> ringbuffer;
private final SequenceBarrier seqbar;
private long seq = 0L;
public Consumer() {
exec = Executors.newCachedThreadPool();
disruptor = new Disruptor<>(Event.EVENT_FACTORY, 1024, Executors.defaultThreadFactory());
ringbuffer = disruptor.start();
seqbar = ringbuffer.newBarrier();
Producer producer = new Producer(ringbuffer);
exec.submit(producer);
}
public Data getData() {
seqbar.waitFor(seq);
Event e = ringbuffer.get(seq);
seq++;
return e.get();
}
}
最后,我像这样运行代码:
public class DisruptorTest {
public static void main(String[] args){
Consumer c = new Consumer();
while (true) {
c.getData();
... Do stuff ...
}
}