7

I have been recently learning about the LMAX Disruptor and been doing some experimentation. One thing that is puzzling me is the endOfBatch parameter of the onEvent handler method of the EventHandler. Consider my following code. First, the dummy message and consumer classes that I call Test1 and Test1Worker:

public class Test1 {

}

public class Test1Worker implements EventHandler<Test1>{
    public void onEvent(Test1 event, long sequence, boolean endOfBatch) {
        try{
            Thread.sleep(500);
        }
        catch(Exception e){
            e.printStackTrace();
        }
        System.out.println("Received message with sequence " + sequence + ". "
                + "EndOfBatch = " + endOfBatch);
    }
}

Notice that I have put a delay of 500 milliseconds just as a substitute for some real world work. I am also printing in the console the sequence number of the

And then my driver class (which is acting as the producer) called DisruptorTest:

public class DisruptorTest {

    private static Disruptor<Test1> bus1;

    private static ExecutorService test1Workers;

    public static void main(String[] args){             
        test1Workers = Executors.newFixedThreadPool(1);

        bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);           
        bus1.handleEventsWith(new Test1Worker());
        RingBuffer<Test1> buf1 = bus1.start();

        for (int i = 0; i < 10; i++){
            long a = System.currentTimeMillis();
            long next = buf1.next();
            long b = System.currentTimeMillis();
            System.out.println("Delay for claiming slot " + i + " is "+ (b - a));
            try {
                Test1 message = buf1.get(next);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                buf1.publish(next);
            }
        }
    }

    public static class Test1Factory implements EventFactory<Test1> {
        public Test1 newInstance() {
            return new Test1();
        }

    }   
}

Here, after initializing the required stuffs, I am feeding 10 messages to the RingBuffer (buffer size 8) and trying to monitor a couple of things - the delay for the producer for claiming the next slot in the RingBuffer and the messages with their sequence numbers on the consumer side, along with whether a particular sequence is being considered as end of batch.

Now, interestingly with the 500 ms delay involved for processing each message, this is what I get as output:

Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Delay for claiming slot 2 is 0
Delay for claiming slot 3 is 0
Delay for claiming slot 4 is 0
Delay for claiming slot 5 is 0
Delay for claiming slot 6 is 0
Delay for claiming slot 7 is 0
Received message with sequence 0. EndOfBatch = true
Delay for claiming slot 8 is 505
Received message with sequence 1. EndOfBatch = false
Received message with sequence 2. EndOfBatch = false
Received message with sequence 3. EndOfBatch = false
Received message with sequence 4. EndOfBatch = false
Received message with sequence 5. EndOfBatch = false
Received message with sequence 6. EndOfBatch = false
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 9 is 3519
Received message with sequence 8. EndOfBatch = true
Received message with sequence 9. EndOfBatch = true

However, if I remove the 500 ms wait time, this is what I get:

Delay for claiming slot 0 is 0
Delay for claiming slot 1 is 0
Received message with sequence 0. EndOfBatch = true
Received message with sequence 1. EndOfBatch = true
Delay for claiming slot 2 is 0
Received message with sequence 2. EndOfBatch = true
Delay for claiming slot 3 is 0
Received message with sequence 3. EndOfBatch = true
Delay for claiming slot 4 is 0
Received message with sequence 4. EndOfBatch = true
Delay for claiming slot 5 is 0
Received message with sequence 5. EndOfBatch = true
Delay for claiming slot 6 is 0
Received message with sequence 6. EndOfBatch = true
Delay for claiming slot 7 is 0
Received message with sequence 7. EndOfBatch = true
Delay for claiming slot 8 is 1
Delay for claiming slot 9 is 0
Received message with sequence 8. EndOfBatch = false
Received message with sequence 9. EndOfBatch = true  

So it looks like whether a certain message is considered to be at the end of a batch (i.e., the size of a batch) is being influenced by the consumer's message processing delay. May be I am being stupid here, but is that how it should be? What's the reasoning behind that? What determines the batch size in general anyway? Thanks in advance. Let me know if anything in my question is unclear.

4

1 回答 1

8

批量大小仅由可用元素的数量决定。因此,如果此时有更多可用元素,那么它将包含在批处理中。例如,如果 Disruptor 调用您的代码并且队列中只有一个元素,那么您将收到一个 endOfBatch=true 的调用。如果队列中有 8 个元素,那么它将收集所有 8 个元素并分批发送。

您可以在下面的代码中看到队列中“可用”的条目数被获取,并且可能比“下一个”项目多得多。所以例如,你现在是 5 岁,正在等待 slot 6,然后 3 个事件到达,available 将是 8,你将收到多个呼叫(针对 6、7、8)。

https://github.com/LMAX-Exchange/disruptor/blob/master/src/main/java/com/lmax/disruptor/BatchEventProcessor.java#L124

final long availableSequence = sequenceBarrier.waitFor(nextSequence);
while (nextSequence <= availableSequence)
{
    event = dataProvider.get(nextSequence);
    eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
    nextSequence++;
}

关于元素 9 处的 500ms 暂停,请注意 Disruptor 是使用环形缓冲区构建的,并且您已将缓冲区中的插槽数指定为 8(请参阅此处的第二个参数):

bus1 = new Disruptor<Test1>(new Test1Factory(), 8, test1Workers);  

如果不是所有消费者都消费了一个元素,并且环形缓冲区已满(所有 8 个元素都已满),则将阻止生产者向缓冲区发布新事件。您可以尝试增加缓冲区大小,例如 200 万个对象,或者确保您的消费者比生产者快,这样队列就不会填满(删除您已经演示过的睡眠)。

于 2015-11-15T06:27:02.747 回答