0

我最近被介绍到 LMAX Disruptor 并决定试一试。感谢开发人员,设置快速且轻松。但我认为如果有人可以帮助我解决这个问题,我会遇到问题。

问题: 有人告诉我,当生产者发布事件时,它应该阻塞,直到消费者有机会在环绕之前检索它。我在消费者端有一个序列屏障,我可以确认如果生产者没有发布数据,消费者的 waitFor 调用将阻塞。但是,生产者似乎不受任何监管,只会环绕并覆盖环形缓冲区中未处理的数据。

我有一个生产者作为在单独线程上运行的可运行对象。

public class Producer implements Runnable {
    private final RingBuffer<Event> ringbuffer;
    public Producer(RingBuffer<Event> rb) {
        ringbuffer = rb;
    }
    public void run() {
           long next = 0L;
           while(true) {
               try {
                   next = ringbuffer.next();
                   Event e = ringbuffer.get(next);
                   ... do stuff...
                   e.set(... stuff...);
               }
               finally {
                   ringbuffer.publish(next);
               }
           }
    }
}

我有一个消费者在主线程上运行。

public class Consumer {
     private final ExecutorService exec;
     private final Disruptor<Event> disruptor;
     private final RingBuffer<Event> ringbuffer;
     private final SequenceBarrier seqbar;
     private long seq = 0L;

     public Consumer() {
         exec = Executors.newCachedThreadPool();
         disruptor = new Disruptor<>(Event.EVENT_FACTORY, 1024, Executors.defaultThreadFactory());
         ringbuffer = disruptor.start();
         seqbar = ringbuffer.newBarrier();

         Producer producer = new Producer(ringbuffer);
         exec.submit(producer);
    }

    public Data getData() {
         seqbar.waitFor(seq);
         Event e = ringbuffer.get(seq);
         seq++;
         return e.get();
    }
}

最后,我像这样运行代码:

public class DisruptorTest {
     public static void main(String[] args){
         Consumer c = new Consumer();
         while (true) {
             c.getData();
             ... Do stuff ...
         }
}
4

1 回答 1

0

您需要向com.lmax.disruptor.SequenceringBuffer 添加一个门控序列 ( ),该序列必须根据您的消费者所在的点进行更新。

您可以使用 EventHandler 接口并使用提供的 BatchEventProcessor( ) 来实现事件处理,该 BatchEventProcessor( com.lmax.disruptor.BatchEventProcessor.BatchEventProcessor) 带有内置序列

这是一个完整的工作示例

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.dsl.Disruptor;

public class Main {

   static class Event {

      int id;
   }

   static class Producer implements Runnable {

      private final RingBuffer<Event> ringbuffer;

      public Producer(RingBuffer<Event> rb) {
         ringbuffer = rb;
      }

      @Override
      public void run() {
         long next = 0L;
         int id = 0;
         while (true) {
            try {
               next = ringbuffer.next();
               Event e = ringbuffer.get(next);
               e.id = id++;
            } finally {
               ringbuffer.publish(next);
            }
         }
      }
   }

   static class Consumer {

      private final ExecutorService exec;
      private final Disruptor<Event> disruptor;
      private final RingBuffer<Event> ringbuffer;
      private final SequenceBarrier seqbar;
      private BatchEventProcessor<Event> processor;

      public Consumer() {
         exec = Executors.newCachedThreadPool();
         disruptor = new Disruptor<>(() -> new Event(), 1024, Executors.defaultThreadFactory());
         ringbuffer = disruptor.start();
         seqbar = ringbuffer.newBarrier();

         processor = new BatchEventProcessor<Main.Event>(
               ringbuffer, seqbar, new Handler());
         ringbuffer.addGatingSequences(processor.getSequence());

         Producer producer = new Producer(ringbuffer);
         exec.submit(producer);
      }
   }

   static class Handler implements EventHandler<Event> {

      @Override
      public void onEvent(Event event, long sequence, boolean endOfBatch) throws Exception {
         System.out.println("Handling event " + event.id);
      }

   }

   public static void main(String[] args) throws Exception {

      Consumer c = new Consumer();
      while (true) {
         c.processor.run();
      }
   }
}
于 2018-05-27T10:19:22.987 回答