0

Disruptor 应该比 BlockingCollection 快得多。

在我之前的问题中,为什么我的破坏者示例如此缓慢?我写了两个测试。Disruptor花费了大约 1 微秒(或更短),而 BlockingCollection 花费了大约 14 微秒。

所以我决定Disruptor在我的程序中使用它,但是当我实现它时,我发现现在Disruptor花费大约50微秒,而 BlockingCollection 仍然花费14-18微秒。

我已将我的生产代码修改为“独立测试”,Disruptor但仍花费 50 微秒。为什么?

下面是一个简化的测试。在这个测试中,我有两个选择。第一个选项是Sleep for 1 ms。然后Disruptor花费 30-50 微秒来交付。第二个选项是模拟活动。然后Disruptor花费 7 微秒来交付。相同的测试BlockingCollection在 14-18 微秒内得出结果。那么为什么 Disruptor 不比 BlockingCollection 快呢?

在我的实际应用程序中Disruptor花费 50 微秒来交付太多的东西!我希望它传递消息的速度应该比 1 微秒快得多。

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        internal int Id { get; set; }
    }

    class DisruptorTest
    {

        public class MyHandler : IEventHandler<ValueEntry>
        {
            private DisruptorTest _parent;

            public MyHandler(DisruptorTest parent)
            {
                this._parent = parent;
            }

            public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
            {
                _parent.sw.Stop();
                long microseconds = _parent.sw.ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));

                // Filter out abnormal delays > 1000
                if (microseconds < 1000)
                {
                    _parent.sum += (int)microseconds;
                    _parent.count++;
                    if (_parent.count % 1000 == 0)
                    {
                        Console.WriteLine("average disruptor delay (microseconds) = {0}", _parent.sum / _parent.count);
                    }
                }
            }
        }

        private RingBuffer<ValueEntry> _ringBuffer;
        private const int RingSize = 64;

        static void Main(string[] args)
        {
            new DisruptorTest().Run();
        }

        public void Run()
        {
            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), RingSize, TaskScheduler.Default);
            disruptor.HandleEventsWith(new MyHandler(this));

            _ringBuffer = disruptor.Start();

            for (int i = 0; i < 10001; i++)
            {
                Do();

                // We need to simulate activity to allow event to deliver

                // Option1. just Sleep. Result 30-50 microseconds.
                Thread.Sleep(1);

                // Option2. Do something. Result ~7 microseconds.
                //factorial = 1;
                //for (int j = 1; j < 100000; j++)
                //{
                //    factorial *= j;
                //}
            }
        }

        public static int factorial;

        private Stopwatch sw = Stopwatch.StartNew();
        private int sum;
        private int count;

        public void Do()
        {
            long sequenceNo = _ringBuffer.Next();
            _ringBuffer[sequenceNo].Id = 0;
            sw.Restart();
            _ringBuffer.Publish(sequenceNo);
        }

    }
}

旧代码。现在应该忽略:

using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;

namespace DisruptorTest
{
    public sealed class ValueEntry
    {
        internal int Id { get; set; }
    }

    class DisruptorTest
    {

        public class MyHandler : IEventHandler<ValueEntry>
        {
            private readonly int _ordinal;
            private readonly int _consumers;
            private DisruptorTest _parent;

            public MyHandler(int ordinal, int consumers, DisruptorTest parent)
            {
                _ordinal = ordinal;
                _consumers = consumers;
                this._parent = parent;
            }

            public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
            {
                if ((sequence % _consumers) == _ordinal)
                {
                    var id = data.Id;
                    _parent.sw[id].Stop();
                    long microseconds = _parent.sw[id].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
                    // filter out abnormal delays > 1000
                    if (microseconds < 1000)
                    {
                        _parent.sum[id] += (int)microseconds;
                        _parent.count[id]++;
                        if (_parent.count[id] % 10 == 0)
                        {
                            Console.WriteLine("Id = {0} average disruptor delay (microseconds) = {1}",
                                id, _parent.sum[id] / _parent.count[id]);
                        }
                    }
                }
            }
        }

        private const int NumberOfThreads = 1;
        private RingBuffer<ValueEntry> _ringBuffer;
        private const int RingSize = 64;

        static void Main(string[] args)
        {
            new DisruptorTest().Run();
        }

        public void Run()
        {
            var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), RingSize, TaskScheduler.Default);
            for (int i = 0; i < NumberOfThreads; i++)
                disruptor.HandleEventsWith(new MyHandler(i, NumberOfThreads, this));

            for (int i = 0; i < sw.Length; i++)
            {
                sw[i] = Stopwatch.StartNew();
            }

            _ringBuffer = disruptor.Start();

            //var rnd = new Random();
            for (int i = 0; i < 1000; i++)
            {
                //Do(rnd.Next(MaxId));
                Do(i % MaxId);
                Thread.Sleep(1);
            }
        }

        private const int MaxId = 100;

        private Stopwatch[] sw = new Stopwatch[MaxId];
        private int[] sum = new int[MaxId];
        private int[] count = new int[MaxId];

        public void Do(int id)
        {
            long sequenceNo = _ringBuffer.Next();
            _ringBuffer[sequenceNo].Id = id;
            sw[id].Restart();
            _ringBuffer.Publish(sequenceNo);
        }

    }
}

输出:

......
Id = 91 average disruptor delay (microseconds) = 50
Id = 92 average disruptor delay (microseconds) = 48
Id = 93 average disruptor delay (microseconds) = 35
Id = 94 average disruptor delay (microseconds) = 35
Id = 95 average disruptor delay (microseconds) = 51
Id = 96 average disruptor delay (microseconds) = 55
Id = 97 average disruptor delay (microseconds) = 38
Id = 98 average disruptor delay (microseconds) = 37
Id = 99 average disruptor delay (microseconds) = 45
4

2 回答 2

4

您仍在做同样的事情:您正在测量发布单个项目需要多少时间。

public void Do(int id)
{
    long sequenceNo = _ringBuffer.Next();
    _ringBuffer[sequenceNo].Id = id;
    sw[id].Restart(); // <--- You're doing this EVERY TIME YOU PUBLISH an item!
    _ringBuffer.Publish(sequenceNo);
}

在您之前的问题中,您被告知您应该测量数千个发布,以便正确利用Stopwatch精度。

此外,您仍在测试过程中写入控制台。避免这样做:

if (_parent.count[id] % 10 == 0)
{
    Console.WriteLine("Id = {0} average disruptor delay (microseconds) = {1}",
        id, _parent.sum[id] / _parent.count[id]);
}

清理你的代码

至少,你应该试着清理一下你的代码;我已经重新组织了一下,所以它不是那么混乱: http: //pastie.org/5382971

Disrputors 一开始并不那么简单,现在我们必须处理您的代码并尝试告诉您如何修复它。更重要的是:当你有意大利面条代码时,你不能进行性能优化或测试。尽量保持一切简单和干净。在这个阶段,你的代码既不简单也不干净。

让我们从私有成员变量的可怕命名约定开始:

private const int NumberOfThreads = 1;
private RingBuffer<ValueEntry> _ringBuffer;
private const int RingSize = 64;
private const int MaxId = 100
private Stopwatch[] sw = new Stopwatch[MaxId];
private int[] sum = new int[MaxId];
private int[] count = new int[MaxId];

始终如一:

private const int _numberOfThreads = 1;
private RingBuffer<ValueEntry> _ringBuffer;
private const int _ringSize = 64;
private const int _maxId = 100
private Stopwatch[] _sw = new Stopwatch[MaxId];
private int[] _sum = new int[MaxId];
private int[] _count = new int[MaxId];

其他一些指针:

  • 摆脱嵌套类。
  • 将 main 移出一个单独的类(例如 Program)。

建立一个好的测试

Martin 和 Michael 告诉您的第一件事是性能测试也必须非常好,因此他们花费了相当多的时间来构建测试框架

  • 我建议您尝试几百万个事件,而不是 1000 个事件。
  • 确保您只为所有事件使用一个计时器。
  • 当您开始处理项目时启动计时器并在没有更多项目要处理时停止它。
  • 了解何时完成处理项目的一种有效方法是使用CountDownEvent.

更新

所以让我们先解决第一个争议:秒表的精度确实应该足够了。

Int64 frequency = Stopwatch.Frequency;
Console.WriteLine( "  Timer frequency in ticks per second = {0}", frequency );
Int64 nanosecPerTick = (1000L * 1000L * 1000L) / frequency;
Console.WriteLine( "  Timer is accurate within {0} nanoseconds", nanosecPerTick );

在我的机器上,分辨率在 320 纳秒内。所以OP是正确的,定时器的分辨率不应该是一个问题。

我知道 OP 想要测量平均一件物品的交付,但是(至少)有两种方法可以做到这一点。

我们必须调查差异。在概念层面上,您正在做与以下代码完全相同的事情:

  1. 你正在运行一堆迭代。
  2. 测量它们中的每一个。
  3. 你计算总数。
  4. 最后计算平均值。

在代码中:

Stopwatch sw = new Stopwatch();
long totalMicroseconds = 0;
int numItems = 1000;
for(int i = 0; i < numItems; i++)
{
    sw.Reset();
    sw.Start();
    OneItemDelivery();
    sw.Stop();
    totalMicroseconds += sw.ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
}
long avgOneItemDelivery = totalMicroseconds/numItems;

衡量性能的另一种方法是:

  1. 启动计时器。
  2. 运行所有迭代。
  3. 停止计时器。
  4. 计算平均时间。

在代码中:

sw.Start();
for(int i = 0; i < numItems; i++)
{
    OneItemDelivery();    
}
sw.Stop();
totalMicroseconds = sw.ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
long avgOneItemDelivery = totalMicroseconds/numItems;

每个人都可能有自己的问题:

  • 第一种方法可能不太精确,您需要在您的系统上证明秒表可以精确处理这么少的工作(除了简单地计算纳秒精度)。
  • 第二种方法还将包括发生迭代所需的计算时间。这会在您的测量中引入少量偏差,但它可以解决您通常会在第一种方法中看到的精度问题。

您已经注意到Sleep语句会产生较低的性能,因此我建议您进行简单的计算。计算阶乘似乎是个好主意,只需进行非常小的计算即可:不需要 100000,100 也应该没问题。

当然,您不需要等待 2 分钟进行测试,但 10-20 秒应该不是问题。

于 2012-11-15T15:07:39.097 回答
1

我阅读了您编写的 BlockingCollection 代码,为什么我的破坏者示例如此缓慢?, 你在 Disruptor 里加了很多Console.WriteLine,但在 BlockingCollection 里没有,Console.WriteLine很慢,里面有锁。

RingBufferSize的太小了,这会影响性能,应该是1024或更大。

并且while (!dataItems.IsCompleted)可能有一些问题,BlockCollection 总是处于添加状态,它会导致线程提前结束。

Task.Factory.StartNew(() => {
    while (!dataItems.IsCompleted)
    {

        ValueEntry ve = null;
        try
        {
    ve = dataItems.Take();
    long microseconds = sw[ve.Value].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
    results[ve.Value] = microseconds;

    //Console.WriteLine("elapsed microseconds = " + microseconds);
    //Console.WriteLine("Event handled: Value = {0} (processed event {1}", ve.Value, ve.Value);
        }
        catch (InvalidOperationException) { }
    }
}, TaskCreationOptions.LongRunning);


for (int i = 0; i < length; i++)
{
    var valueToSet = i;

    ValueEntry entry = new ValueEntry();
    entry.Value = valueToSet;

    sw[i].Restart();
    dataItems.Add(entry);

    //Console.WriteLine("Published entry {0}, value {1}", valueToSet, entry.Value);
    //Thread.Sleep(1000);
}

我已经重写了你的代码,Disruptor 比具有多个生产者(10 个并行生产者)的 BlockingCollection 快 10 倍,比具有单个生产者的 BlockingCollection 快 2 倍:

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
using Disruptor.Dsl;
using NUnit.Framework;

namespace DisruptorTest.Ds
{
    public sealed class ValueEntry
    {
        internal int Id { get; set; }
    }

    class MyHandler : IEventHandler<ValueEntry>
    {
        public void OnEvent(ValueEntry data, long sequence, bool endOfBatch)
        {
        }
    }

    [TestFixture]
    public class DisruptorPerformanceTest
    {
        private volatile bool collectionAddEnded;

        private int producerCount = 10;
        private int runCount = 1000000;
        private int RingBufferAndCapacitySize = 1024;

        [TestCase()]
        public async Task TestBoth()
        {
            for (int i = 0; i < 1; i++)
            {
                foreach (var rs in new int[] {64, 512, 1024, 2048 /*,4096,4096*2*/})
                {
                    Console.WriteLine($"RingBufferAndCapacitySize:{rs}, producerCount:{producerCount}, runCount:{runCount} of {i}");
                    RingBufferAndCapacitySize = rs;
                    await DisruptorTest();
                    await BlockingCollectionTest();
                }
            }
        }

        [TestCase()]
        public async Task BlockingCollectionTest()
        {
            var sw = new Stopwatch();
            BlockingCollection<ValueEntry> dataItems = new BlockingCollection<ValueEntry>(RingBufferAndCapacitySize);

            sw.Start();

            collectionAddEnded = false;

            // A simple blocking consumer with no cancellation.
            var task = Task.Factory.StartNew(() =>
            {
                while (!collectionAddEnded && !dataItems.IsCompleted)
                {
                    //if (!dataItems.IsCompleted && dataItems.TryTake(out var ve))
                    if (dataItems.TryTake(out var ve))
                    {
                    }
                }
            }, TaskCreationOptions.LongRunning);


            var tasks = new Task[producerCount];
            for (int t = 0; t < producerCount; t++)
            {
                tasks[t] = Task.Run(() =>
                {
                    for (int i = 0; i < runCount; i++)
                    {
                        ValueEntry entry = new ValueEntry();
                        entry.Id = i;
                        dataItems.Add(entry);
                    }
                });
            }

            await Task.WhenAll(tasks);

            collectionAddEnded = true;
            await task;

            sw.Stop();

            Console.WriteLine($"BlockingCollectionTest Time:{sw.ElapsedMilliseconds/1000d}");
        }


        [TestCase()]
        public async Task DisruptorTest()
        {
            var disruptor =
                new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), RingBufferAndCapacitySize, TaskScheduler.Default,
                    producerCount > 1 ? ProducerType.Multi : ProducerType.Single, new BlockingWaitStrategy());
            disruptor.HandleEventsWith(new MyHandler());

            var _ringBuffer = disruptor.Start();

            Stopwatch sw = Stopwatch.StartNew();

            sw.Start();


            var tasks = new Task[producerCount];
            for (int t = 0; t < producerCount; t++)
            {
                tasks[t] = Task.Run(() =>
                {
                    for (int i = 0; i < runCount; i++)
                    {
                        long sequenceNo = _ringBuffer.Next();
                        _ringBuffer[sequenceNo].Id = 0;
                        _ringBuffer.Publish(sequenceNo);
                    }
                });
            }


            await Task.WhenAll(tasks);


            disruptor.Shutdown();

            sw.Stop();
            Console.WriteLine($"DisruptorTest Time:{sw.ElapsedMilliseconds/1000d}s");
        }
    }
}

具有共享 ValueEntry 实例的 BlockingCollectionTest(for 循环中没有新的 ValueEntry())

  • RingBufferAndCapacitySize:64, producerCount:10, runCount:1000000 of 0

    干扰器测试时间:16.962s

    BlockingCollection测试时间:18.399

  • RingBufferAndCapacitySize:512, producerCount:10, runCount:1000000 of 0 DisruptorTest Time:6.101s

    BlockingCollection测试时间:19.526

  • RingBufferAndCapacitySize:1024, producerCount:10, runCount:1000000 of 0

    干扰器测试时间:2.928s

    BlockingCollection测试时间:20.25

  • RingBufferAndCapacitySize:2048, producerCount:10, runCount:1000000 of 0

    干扰器测试时间:2.448s

    BlockingCollection测试时间:20.649

BlockingCollectionTest 在 for 循环中创建一个新的 ValueEntry()

  • RingBufferAndCapacitySize:64, producerCount:10, runCount:1000000 of 0

    干扰器测试时间:27.374s

    BlockingCollection测试时间:21.955

  • RingBufferAndCapacitySize:512, producerCount:10, runCount:1000000 of 0

    干扰器测试时间:5.011s

    BlockingCollection测试时间:20.127

  • RingBufferAndCapacitySize:1024, producerCount:10, runCount:1000000 of 0

    干扰器测试时间:2.877s

    BlockingCollection测试时间:22.656

  • RingBufferAndCapacitySize:2048, producerCount:10, runCount:1000000 of 0

    干扰器测试时间:2.384s

    BlockingCollection测试时间:23.567

https://www.cnblogs.com/darklx/p/11755686.html

于 2019-10-28T06:37:57.167 回答