Disruptor 应该比 BlockingCollection 快得多。
在我之前的问题中,为什么我的破坏者示例如此缓慢?我写了两个测试。Disruptor
花费了大约 1 微秒(或更短),而 BlockingCollection 花费了大约 14 微秒。
所以我决定Disruptor
在我的程序中使用它,但是当我实现它时,我发现现在Disruptor
花费大约50
微秒,而 BlockingCollection 仍然花费14-18
微秒。
我已将我的生产代码修改为“独立测试”,Disruptor
但仍花费 50 微秒。为什么?
下面是一个简化的测试。在这个测试中,我有两个选择。第一个选项是Sleep for 1 ms
。然后Disruptor
花费 30-50 微秒来交付。第二个选项是模拟活动。然后Disruptor
花费 7 微秒来交付。相同的测试BlockingCollection
在 14-18 微秒内得出结果。那么为什么 Disruptor 不比 BlockingCollection 快呢?
在我的实际应用程序中Disruptor
花费 50 微秒来交付太多的东西!我希望它传递消息的速度应该比 1 微秒快得多。
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
namespace DisruptorTest
{
public sealed class ValueEntry
{
internal int Id { get; set; }
}
class DisruptorTest
{
public class MyHandler : IEventHandler<ValueEntry>
{
private DisruptorTest _parent;
public MyHandler(DisruptorTest parent)
{
this._parent = parent;
}
public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
{
_parent.sw.Stop();
long microseconds = _parent.sw.ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
// Filter out abnormal delays > 1000
if (microseconds < 1000)
{
_parent.sum += (int)microseconds;
_parent.count++;
if (_parent.count % 1000 == 0)
{
Console.WriteLine("average disruptor delay (microseconds) = {0}", _parent.sum / _parent.count);
}
}
}
}
private RingBuffer<ValueEntry> _ringBuffer;
private const int RingSize = 64;
static void Main(string[] args)
{
new DisruptorTest().Run();
}
public void Run()
{
var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), RingSize, TaskScheduler.Default);
disruptor.HandleEventsWith(new MyHandler(this));
_ringBuffer = disruptor.Start();
for (int i = 0; i < 10001; i++)
{
Do();
// We need to simulate activity to allow event to deliver
// Option1. just Sleep. Result 30-50 microseconds.
Thread.Sleep(1);
// Option2. Do something. Result ~7 microseconds.
//factorial = 1;
//for (int j = 1; j < 100000; j++)
//{
// factorial *= j;
//}
}
}
public static int factorial;
private Stopwatch sw = Stopwatch.StartNew();
private int sum;
private int count;
public void Do()
{
long sequenceNo = _ringBuffer.Next();
_ringBuffer[sequenceNo].Id = 0;
sw.Restart();
_ringBuffer.Publish(sequenceNo);
}
}
}
旧代码。现在应该忽略:
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Disruptor;
namespace DisruptorTest
{
public sealed class ValueEntry
{
internal int Id { get; set; }
}
class DisruptorTest
{
public class MyHandler : IEventHandler<ValueEntry>
{
private readonly int _ordinal;
private readonly int _consumers;
private DisruptorTest _parent;
public MyHandler(int ordinal, int consumers, DisruptorTest parent)
{
_ordinal = ordinal;
_consumers = consumers;
this._parent = parent;
}
public void OnNext(ValueEntry data, long sequence, bool endOfBatch)
{
if ((sequence % _consumers) == _ordinal)
{
var id = data.Id;
_parent.sw[id].Stop();
long microseconds = _parent.sw[id].ElapsedTicks / (Stopwatch.Frequency / (1000L * 1000L));
// filter out abnormal delays > 1000
if (microseconds < 1000)
{
_parent.sum[id] += (int)microseconds;
_parent.count[id]++;
if (_parent.count[id] % 10 == 0)
{
Console.WriteLine("Id = {0} average disruptor delay (microseconds) = {1}",
id, _parent.sum[id] / _parent.count[id]);
}
}
}
}
}
private const int NumberOfThreads = 1;
private RingBuffer<ValueEntry> _ringBuffer;
private const int RingSize = 64;
static void Main(string[] args)
{
new DisruptorTest().Run();
}
public void Run()
{
var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), RingSize, TaskScheduler.Default);
for (int i = 0; i < NumberOfThreads; i++)
disruptor.HandleEventsWith(new MyHandler(i, NumberOfThreads, this));
for (int i = 0; i < sw.Length; i++)
{
sw[i] = Stopwatch.StartNew();
}
_ringBuffer = disruptor.Start();
//var rnd = new Random();
for (int i = 0; i < 1000; i++)
{
//Do(rnd.Next(MaxId));
Do(i % MaxId);
Thread.Sleep(1);
}
}
private const int MaxId = 100;
private Stopwatch[] sw = new Stopwatch[MaxId];
private int[] sum = new int[MaxId];
private int[] count = new int[MaxId];
public void Do(int id)
{
long sequenceNo = _ringBuffer.Next();
_ringBuffer[sequenceNo].Id = id;
sw[id].Restart();
_ringBuffer.Publish(sequenceNo);
}
}
}
输出:
......
Id = 91 average disruptor delay (microseconds) = 50
Id = 92 average disruptor delay (microseconds) = 48
Id = 93 average disruptor delay (microseconds) = 35
Id = 94 average disruptor delay (microseconds) = 35
Id = 95 average disruptor delay (microseconds) = 51
Id = 96 average disruptor delay (microseconds) = 55
Id = 97 average disruptor delay (microseconds) = 38
Id = 98 average disruptor delay (microseconds) = 37
Id = 99 average disruptor delay (microseconds) = 45