3

按照Disruptor Getting Started Guide,我构建了一个具有单个生产者和单个消费者的最小破坏者。

制片人

import com.lmax.disruptor.RingBuffer;

public class LongEventProducer
{
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    public void onData()
    {
        long sequence = ringBuffer.next();
        try
        {
            LongEvent event = ringBuffer.get(sequence);
        }
        finally
        {
            ringBuffer.publish(sequence);
        }
    }
}

消费者(注意消费者什么都不做onEvent

import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler<LongEvent>
{
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
    {}
}

我的目标是性能测试一次绕过一个大的环形缓冲区,而不是多次遍历一个较小的环形。在每种情况下,总操作数 ( bufferSizeX rotations) 是相同的。我发现随着环形缓冲区变小,操作/秒速率急剧下降。

RingBuffer Size |  Revolutions  | Total Ops   |   Mops/sec

    1048576     |      1        |  1048576    |     50-60

       1024     |      1024     |  1048576    |     8-16

        64      |      16384    |  1048576    |    0.5-0.7

        8       |      131072   |  1048576    |    0.12-0.14

问题: 当环形缓冲区大小减小但总迭代次数固定时,性能大幅下降的原因是什么? 这种趋势是独立于WaitStrategySingle vs MultiProducer- 吞吐量降低,但趋势是相同的。

主要(通知SingleProducerBusySpinWaitStrategy

import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class LongEventMainJava{
        static double ONEMILLION = 1000000.0;
        static double ONEBILLION = 1000000000.0;

    public static void main(String[] args) throws Exception {
            // Executor that will be used to construct new threads for consumers
            Executor executor = Executors.newCachedThreadPool();    

            // TUNABLE PARAMS
            int ringBufferSize = 1048576; // 1024, 64, 8
            int rotations = 1; // 1024, 16384, 131702

            // Construct the Disruptor
            Disruptor disruptor = new Disruptor<>(new LongEventFactory(), ringBufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());

            // Connect the handler
            disruptor.handleEventsWith(new LongEventHandler());

            // Start the Disruptor, starts all threads running
            disruptor.start();

            // Get the ring buffer from the Disruptor to be used for publishing.
            RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
            LongEventProducer producer = new LongEventProducer(ringBuffer);

            long start = System.nanoTime();
            long totalIterations = rotations * ringBufferSize;
            for (long i = 0; i < totalIterations; i++) {
                producer.onData();
            }
            double duration = (System.nanoTime()-start)/ONEBILLION;
            System.out.println(String.format("Buffersize: %s, rotations: %s, total iterations = %s, duration: %.2f seconds, rate: %.2f Mops/s",
                    ringBufferSize, rotations, totalIterations, duration, totalIterations/(ONEMILLION * duration)));
        }
}

要运行,您需要简单的工厂代码

import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory<LongEvent>
{
    public LongEvent newInstance()
    {
        return new LongEvent();
    }
}

在核心 i5-2400、12GB 内存、Windows 7 上运行

样本输出

Buffersize: 1048576, rotations: 1, total iterations = 1048576, duration: 0.02 seconds, rate: 59.03 Mops/s

Buffersize: 64, rotations: 16384, total iterations = 1048576, duration: 2.01 seconds, rate: 0.52 Mops/s
4

2 回答 2

5

当生产者填满环形缓冲区时,它必须等到事件被消耗后才能继续。

当你的缓冲区正好是你要放入的元素数量时,生产者永远不必等待。它永远不会溢出。它所做的只是增加计数、索引,并在该索引处发布环形缓冲区中的数据。

当您的缓冲区较小时,它仍然只是增加计数并发布,但它的执行速度比消费者可以消耗的快。因此,生产者必须等到元素被消耗并且环形缓冲区上的空间被释放。

于 2017-07-03T21:25:00.297 回答
0

似乎问题出在这段代码中lmax\disruptor\SingleProducerSequencer

if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue)
        {
            cursor.setVolatile(nextValue);  // StoreLoad fence

            long minSequence;
            while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue)))
            {
                waitStrategy.signalAllWhenBlocking();
                LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin?
            }

            this.cachedValue = minSequence;
        }

特别是对LockSupport.parkNanos(1L). 这在 Windows 上最多可能需要15 毫秒。当生产者到达缓冲区的末尾并等待消费者时,就会调用它。

其次,当缓冲区较小时,很可能会发生 RingBuffer 的错误共享。我猜这两种效果都在起作用。

onData()最后,我能够在基准测试之前通过一百万次调用使用 JIT 来加速代码。这得到了最好的情况> 80Mops/sec,但并没有消除缓冲收缩带来的退化。

于 2017-07-05T21:10:19.693 回答