我在 .Net 4.7.2 中遇到了与 BlockingCollection 相同的性能问题,并找到了这篇文章。我的案例是MultipleProducers-MultipleConsumers,特别是小数据块是从许多来源读取的,应该由许多过滤器处理。使用了几个(Env.ProcessorCount)BlockingCollections,最后我得到一个性能分析器,告诉我它BlockingCollection.GetConsumingEnumerable.MoveNext()
比实际过滤消耗更多的 CPU 时间!
谢谢@Eugene Beresovsky,您的代码。仅供参考:在我的环境中,它几乎比 BlockingCollection 慢两倍。所以,这是我的 SpinLocked BlockingCollection:
public class BlockingCollectionSpin<T>
{
private SpinLock _lock = new SpinLock(false);
private Queue<T> _queue = new Queue<T>();
public void Add(T item)
{
bool gotLock = false;
try
{
_lock.Enter(ref gotLock);
_queue.Enqueue(item);
}
finally
{
if (gotLock) _lock.Exit(false);
}
}
public bool TryPeek(out T result)
{
bool gotLock = false;
try
{
_lock.Enter(ref gotLock);
if (_queue.Count > 0)
{
result = _queue.Peek();
return true;
}
else
{
result = default(T);
return false;
}
}
finally
{
if (gotLock) _lock.Exit(false);
}
}
public T Take()
{
var spin = new SpinWait();
do
{
bool gotLock = false;
try
{
_lock.Enter(ref gotLock);
if (_queue.Count > 0)
return _queue.Dequeue();
}
finally
{
if (gotLock) _lock.Exit(false);
}
spin.SpinOnce();
} while (true);
}
}
对于性能关键代码,我建议避免使用readonly
字段修饰符。它添加了对 IL 中每个字段访问的检查。使用以下测试代码
private static void TestBlockingCollections()
{
const int workAmount = 10000000;
var workerCount = Environment.ProcessorCount * 2;
var sw = new Stopwatch();
var source = new long[workAmount];
var rnd = new Random();
for (int i = 0; i < workAmount; i++)
source[i] = rnd.Next(1000000);
var swOverhead = 0.0;
for (int i = 0; i < workAmount; i++)
{
sw.Restart();
swOverhead += sw.Elapsed.TotalMilliseconds;
}
swOverhead /= workAmount;
var sum1 = new long[workerCount];
var queue1 = new BlockingCollection<long>(10000);
var workers = Enumerable.Range(0, workerCount - 1).Select(n =>
Task.Factory.StartNew(() =>
{
foreach (var l in queue1.GetConsumingEnumerable())
sum1[n] += l;
})).ToArray();
Thread.Sleep(1000);
sw.Restart();
foreach (var l in source)
queue1.Add(l);
queue1.CompleteAdding();
Task.WaitAll(workers);
var elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
Console.WriteLine("BlockingCollection {0:F4}ms", elapsed / workAmount);
var sum2 = new long[workerCount];
var queue2 = new BlockingCollectionSlim<long?>();
workers = Enumerable.Range(0, workerCount - 1).Select(n =>
Task.Factory.StartNew(() =>
{
long? l;
while ((l = queue2.Take()).HasValue)
sum2[n] += l.Value;
})).ToArray();
Thread.Sleep(1000);
sw.Restart();
foreach (var l in source)
queue2.Add(l);
for (int i = 0; i < workerCount; i++)
queue2.Add(null);
Task.WaitAll(workers);
elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
Console.WriteLine("BlockingCollectionSlim {0:F4}ms", elapsed / workAmount);
var sum3 = new long[workerCount];
var queue3 = new BlockingCollectionSpin<long?>();
workers = Enumerable.Range(0, workerCount - 1).Select(n =>
Task.Factory.StartNew(() =>
{
long? l;
while ((l = queue3.Take()).HasValue)
sum3[n] += l.Value;
})).ToArray();
Thread.Sleep(1000);
sw.Restart();
foreach (var l in source)
queue3.Add(l);
for (int i = 0; i < workerCount; i++)
queue3.Add(null);
Task.WaitAll(workers);
elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
Console.WriteLine("BlockingCollectionSpin {0:F4}ms", elapsed/workAmount);
if (sum1.Sum() != sum2.Sum() || sum2.Sum() != sum3.Sum())
Console.WriteLine("Wrong sum in the end!");
Console.ReadLine();
}
在具有 2 个内核并启用 HT 的 Core i5-3210M 上,我得到以下输出:
阻塞集合 0.0006ms
BlockingCollectionSlim 0.0010ms(Eugene Beresovsky 实现)
BlockingCollectionSpin 0.0003ms
因此,SpinLocked 版本比 .Net 快两倍BlockingCollection
。但是,我建议只使用它!如果您真的更喜欢性能而不是代码简单性(和可维护性)。