1

我在我的 Web 应用程序中使用 LMAX 中断器,它接受 http 请求参数并将它们处理到环形缓冲区。3 个事件处理程序处理和处理数据,最后一个将其保存到数据库中。当 servlet 被实例化时,初始化一次 ringbuffer。这是正确的吗?

public void init() throws ServletException {

    Disruptor<CampaignCode> disruptor = new Disruptor<CampaignCode>(
            CampaignCode.FACTORY, 1024, Executors.newCachedThreadPool());

    EventHandler<CampaignCode> campaignDetailsLoader = new CampaignDetailsLoaderEvent();
    EventHandler<CampaignCode> templateEvent = new TemplateBuildEvent();
    EventHandler<CampaignCode> codeGenerator = new CodeGenerationEventHandler();
    EventHandler<CampaignCode> campaignSaveEventHandler= new CampaignSaveEventHandler();

    disruptor.handleEventsWith(templateEvent, campaignDetailsLoader).then(
            codeGenerator).then(campaignSaveEventHandler);
    this.ringBuffer = disruptor.start();
}

在这里,我将值直接放入 ringbuffer

    @Override
protected void doPost(HttpServletRequest request,
        HttpServletResponse response) throws ServletException, IOException      {
    String campaignId = request.getParameter("campaignId");
    String campaignType = request.getParameter("campaignType");
    if (campaignId != null && !campaignId.isEmpty()) {
        long sequence = ringBuffer.next();
        CampaignCode campaign = ringBuffer.get(sequence);
        campaign.setCampaignId(Long.parseLong(campaignId));
        campaign.setCampaignType(campaignType);
        ringBuffer.publish(sequence);
    }
  }

事件处理程序

public class CampaignDetailsLoaderEvent implements EventHandler<CampaignCode> {
     @Override
public void onEvent(CampaignCode event, long sequence, boolean endOfBatch)
        throws Exception {
         //load details from db and process
         // udpate values to the event object
  }
 }

  public class TemplateBuildEvent implements EventHandler<CampaignCode> {
     @Override
public void onEvent(CampaignCode event, long sequence, boolean endOfBatch)
        throws Exception {
         // find the template of this type
         // set template to the event object
  }
 }

 public class CodeGenerationEventHandler implements EventHandler<CampaignCode> {
     @Override
public void onEvent(CampaignCode event, long sequence, boolean endOfBatch)
        throws Exception {
         // generate custom dsl code and execute it
         // update the output to the event object 
         //next handler will save it the db
  }
 }

  public class CampaignSaveEventHandler implements EventHandler<CampaignCode> {
     @Override
public void onEvent(CampaignCode event, long sequence, boolean endOfBatch)
        throws Exception {
         // save the details to db
         // done!
  }
 }

这是发布到环形缓冲区的正确方法吗?我需要同步“ringBuffer”对象吗?前 2 个事件并行运行,然后是第 3 个事件。当我有快速的发布者和缓慢的消费者时,我应该如何处理这个问题?我正在使用disruptor 3.1.1,我在web 环境中找不到很好的disruptor 使用示例。一个简单的代码实现,如果你做过,会帮助我理解很多!

4

1 回答 1

1

鉴于您所述的代码要求,此实现是正确的。最佳实践是将发布代码包装在 try-finally 块中,以确保始终发布声明的序列:

long sequence = ringBuffer.next();
  try {
  Event e = ringBuffer.get(sequence);
  // Do some work with the event.
} finally {
  ringBuffer.publish(sequence);
}

在构造函数中显式指定您需要一个多生产者 Disruptor 可能也是一个好主意,但这已经在您使用的默认构造函数中完成了。您不应写入同步到 ,RingBuffer因为声明和发布序列号的过程已经是线程安全的。但是请注意,不能保证将事件发布到RingBuffer并发调用doPost()的顺序与您的 Web 应用程序接收它们的顺序相同。

Disruptor 只是一个专门的队列,因此会遇到它们无限增长的所有常见问题。如果缓冲区中没有可用的插槽,您的调用ringBuffer.next()将阻塞,直到有一个可用。您应该为处理突发流量提供足够的容量RingBuffer,并考虑在缓冲区已填满(希望很少见)的情况下应用背压的方法。

在您的特定用例中,如果CodeGenerationorCampaignSave步骤与前两个相比花费了很长时间,并且可以推迟,那么使用额外的 Disruptors/RingBuffers 来为这些执行排队事件可能是有意义的。

于 2013-08-23T23:40:08.300 回答