0

我有以下用例:

当我的服务启动时,它可能需要在尽可能短的时间内处理数百万个文档。将有三个数据来源。

我已经设置了以下内容:

    /* batchSize = 100, bufferSize = 2^30
    public MyDisruptor(@NonNull final MyDisruptorConfig config) {
        batchSize = config.getBatchSize();
        bufferSize = config.getBufferSize();
        this.eventHandler = config.getEventHandler();
        ThreadFactory threadFactory = createThreadFactory("disruptor-threads-%d");
        executorService = Executors.newSingleThreadExecutor(threadFactory);
        ringBuffer = RingBuffer.createMultiProducer(new EventFactory(), bufferSize, new YieldingWaitStrategy());
        sequenceBarrier = ringBuffer.newBarrier();
        batchEventProcessor = new BatchEventProcessor<>(ringBuffer, sequenceBarrier, eventHandler);
        ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
        executorService.submit(batchEventProcessor);
    }

    public void consume(@NonNull final List<Document> documents) {
        List<List<Document>> subLists = Lists.partition(documents, batchSize);
        for (List<Document> subList : subLists) {
            log.info("publishing sublist of size {}", subList.size());
            long high = ringBuffer.next(subList.size());
            long low = high - (subList.size() - 1);
            long position = low;
            for (Document document: subList) {
                ringBuffer.get(position++).setEvent(document);
            }
            ringBuffer.publish(low, high);
            lastPublishedSequence.set(high);
        }
    }

我的每个来源调用都消耗,我使用 Guice 创建一个 Singleton 破坏器。

我的 eventHandler 例程是

    public void onEvent(Event event, long sequence, boolean endOfBatch) throws Exception {
        Document document = event.getValue();
        handler.processDocument(document); //send the document to handler
        if (endOfBatch) {
            handler.processDocumentsList(); // tell handler to process all documents so far.
        }
    }

我在日志中看到生产者 ( consume) 有时会停滞不前。我假设这是 ringBuffer 已满,而 eventHandler 无法足够快地处理。我看到 eventHandler 正在处理文档(来自我的日志),然后过了一会儿,生产者开始将更多文档发布到环形缓冲区。

问题:

  • 我是否使用了正确的 Disruptor 模式?我看到有很多方法可以使用它。我选择使用 batchEventProcessor 所以它会发出信号endOfBatch
  • 如何提高 EventHandler 的效率?processDocumentsList 可能很慢。
  • 我应该使用并行事件处理程序吗?lmax用户指南提到这是可能的,FAQ中有一个问题。但是如何将它与 batchEventProcessor 一起使用?它只需要一个事件处理程序。
4

1 回答 1

0

你的handler有状态吗?如果没有,您可以使用多个并行事件处理程序来处理文档。您可以实现一个基本的分片策略,其中只有一个处理程序处理每个事件。

endOfBatch通常用于通过优化受益于批处理的 IO 操作来加快处理速度。例如,在每个事件上写入文件,但仅在endOfBatch.

如果不知道您的文档处理器中发生了什么,就很难给出更多建议。

于 2021-12-10T00:18:50.847 回答