我有以下用例:
当我的服务启动时,它可能需要在尽可能短的时间内处理数百万个文档。将有三个数据来源。
我已经设置了以下内容:
/* batchSize = 100, bufferSize = 2^30
public MyDisruptor(@NonNull final MyDisruptorConfig config) {
batchSize = config.getBatchSize();
bufferSize = config.getBufferSize();
this.eventHandler = config.getEventHandler();
ThreadFactory threadFactory = createThreadFactory("disruptor-threads-%d");
executorService = Executors.newSingleThreadExecutor(threadFactory);
ringBuffer = RingBuffer.createMultiProducer(new EventFactory(), bufferSize, new YieldingWaitStrategy());
sequenceBarrier = ringBuffer.newBarrier();
batchEventProcessor = new BatchEventProcessor<>(ringBuffer, sequenceBarrier, eventHandler);
ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
executorService.submit(batchEventProcessor);
}
public void consume(@NonNull final List<Document> documents) {
List<List<Document>> subLists = Lists.partition(documents, batchSize);
for (List<Document> subList : subLists) {
log.info("publishing sublist of size {}", subList.size());
long high = ringBuffer.next(subList.size());
long low = high - (subList.size() - 1);
long position = low;
for (Document document: subList) {
ringBuffer.get(position++).setEvent(document);
}
ringBuffer.publish(low, high);
lastPublishedSequence.set(high);
}
}
我的每个来源调用都消耗,我使用 Guice 创建一个 Singleton 破坏器。
我的 eventHandler 例程是
public void onEvent(Event event, long sequence, boolean endOfBatch) throws Exception {
Document document = event.getValue();
handler.processDocument(document); //send the document to handler
if (endOfBatch) {
handler.processDocumentsList(); // tell handler to process all documents so far.
}
}
我在日志中看到生产者 ( consume
) 有时会停滞不前。我假设这是 ringBuffer 已满,而 eventHandler 无法足够快地处理。我看到 eventHandler 正在处理文档(来自我的日志),然后过了一会儿,生产者开始将更多文档发布到环形缓冲区。
问题: