29

在我的公司有一段时间,我们使用了一种本地ObjectPool<T>实现,它提供了对其内容的阻止访问。它非常简单: a Queue<T>, anobject锁定,以及在AutoResetEvent添加项目时向“借用”线程发出信号。

类的肉真的是这两种方法:

public T Borrow() {
    lock (_queueLock) {
        if (_queue.Count > 0)
            return _queue.Dequeue();
    }

    _objectAvailableEvent.WaitOne();

    return Borrow();
}

public void Return(T obj) {
    lock (_queueLock) {
        _queue.Enqueue(obj);
    }

    _objectAvailableEvent.Set();
}

我们一直在使用这个和其他一些集合类,而不是提供的那些,System.Collections.Concurrent因为我们使用的是 .NET 3.5,而不是 4.0。但最近我们发现,由于我们使用的是Reactive Extensions,实际上我们确实可用的Concurrent命名空间(在 System.Threading.dll 中)。

自然地,我认为既然BlockingCollection<T>Concurrent命名空间中的核心类之一,它可能会提供比我或我的队友写的任何东西更好的性能。

所以我尝试编写一个非常简单的新实现:

public T Borrow() {
    return _blockingCollection.Take();
}

public void Return(T obj) {
    _blockingCollection.Add(obj);
}

令我惊讶的是,根据一些简单的测试(从多个线程借/返回池几千次),我们最初的实现在性能方面明显优于BlockingCollection<T>。它们似乎都可以正常工作;只是我们最初的实现似乎要快得多。

我的问题:

  1. 为什么会这样?可能是因为BlockingCollection<T>提供了更大的灵活性(我知道它通过包装一个 来工作IProducerConsumerCollection<T>),这必然会带来性能开销?
  2. 这只是对BlockingCollection<T>课程的完全错误的使用吗?
  3. 如果这是对 的适当使用BlockingCollection<T>,我只是没有正确使用吗?例如,Take/Add方法是否过于简单化,是否有更好的方法来获得相同的功能?

除非有人对第三个问题有一些见解,否则看起来我们现在将坚持我们最初的实现。

4

3 回答 3

30

这里有几个潜在的可能性。

首先,BlockingCollection<T>在 Reactive Extensions 中有一个 backport,与 .NET 4 最终版本不完全相同。如果这个 backport 的性能与 .NET 4 RTM 不同,我不会感到惊讶(尽管我没有专门分析这个集合)。许多 TPL 在 .NET 4 中的性能优于在 .NET 3.5 反向移植中。

BlockingCollection<T>话虽如此,如果您有一个生产者线程和一个消费者线程,我怀疑您的实现会表现出色。在一个生产者和一个消费者的情况下,你的锁对整体性能的影响会更小,而重置事件是消费者端等待的一种非常有效的手段。

但是,BlockingCollection<T>它旨在允许许多生产者线程很好地“排队”数据。这在您的实现中表现不佳,因为锁定争用将很快开始成为问题。

话虽如此,我还想在这里指出一个误解:

...它可能会提供比我或我的队友写的任何东西更好的性能。

这通常是不正确的。框架集合类通常执行得非常好,但通常不是给定场景的最佳性能选项。话虽如此,它们往往表现良好,同时非常灵活且非常健壮。它们通常倾向于很好地扩展。“自制”集合类在特定场景中的性能通常优于框架集合,但在用于它们专门设计的场景之外的场景时往往会出现问题。我怀疑这是其中一种情况。

于 2010-06-14T18:25:31.090 回答
14

我在 .Net 4 中尝试BlockingCollection了一个ConurrentQueue/AutoResetEvent组合(类似于 OP 的解决方案,但无锁),而后一个组合对于我的用例来说要快得多,以至于我放弃了 BlockingCollection。不幸的是,这几乎是一年前的事了,我找不到基准测试结果。

使用单独的 AutoResetEvent 不会使事情变得更加复杂。事实上,人们甚至可以一劳永逸地把它抽象成一个BlockingCollectionSlim......

BlockingCollection 在内部也依赖于 ConcurrentQueue ,但会使用苗条信号量取消令牌进行一些额外的处理,这会产生额外的功能,但即使不使用也是有代价的。还应该注意的是,BlockingCollection 没有与 ConcurrentQueue 结合,但也可以与其他实现者一起使用IProducerConsumerCollection


一个无界的、非常简单的 BlockingCollectionSlim 实现:

class BlockingCollectionSlim<T>
{
    private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
    public void Add(T item)
    {
        _queue.Enqueue(item);
        _autoResetEvent.Set();
    }
    public bool TryPeek(out T result)
    {
        return _queue.TryPeek(out result);
    }
    public T Take()
    {
        T item;
        while (!_queue.TryDequeue(out item))
            _autoResetEvent.WaitOne();
        return item;
    }
    public bool TryTake(out T item, TimeSpan patience)
    {
        if (_queue.TryDequeue(out item))
            return true;
        var stopwatch = Stopwatch.StartNew();
        while (stopwatch.Elapsed < patience)
        {
            if (_queue.TryDequeue(out item))
                return true;
            var patienceLeft = (patience - stopwatch.Elapsed);
            if (patienceLeft <= TimeSpan.Zero)
                break;
            else if (patienceLeft < MinWait)
            // otherwise the while loop will degenerate into a busy loop,
            // for the last millisecond before patience runs out
                patienceLeft = MinWait;
            _autoResetEvent.WaitOne(patienceLeft);
        }
        return false;
    }
    private static readonly TimeSpan MinWait = TimeSpan.FromMilliseconds(1);
于 2015-03-26T00:39:33.730 回答
2

我在 .Net 4.7.2 中遇到了与 BlockingCollection 相同的性能问题,并找到了这篇文章。我的案例是MultipleProducers-MultipleConsumers,特别是小数据块是从许多来源读取的,应该由许多过滤器处理。使用了几个(Env.ProcessorCount)BlockingCollections,最后我得到一个性能分析器,告诉我它BlockingCollection.GetConsumingEnumerable.MoveNext()比实际过滤消耗更多的 CPU 时间!

谢谢@Eugene Beresovsky,您的代码。仅供参考:在我的环境中,它几乎比 BlockingCollection 慢两倍。所以,这是我的 SpinLocked BlockingCollection:

public class BlockingCollectionSpin<T>
{
    private SpinLock _lock = new SpinLock(false);
    private Queue<T> _queue = new Queue<T>();

    public void Add(T item)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            _queue.Enqueue(item);
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public bool TryPeek(out T result)
    {
        bool gotLock = false;
        try
        {
            _lock.Enter(ref gotLock);
            if (_queue.Count > 0)
            {
                result = _queue.Peek();
                return true;
            }
            else
            {
                result = default(T);
                return false;
            }
        }
        finally
        {
            if (gotLock) _lock.Exit(false);
        }
    }

    public T Take()
    {
        var spin = new SpinWait();
        do
        {
            bool gotLock = false;
            try
            {
                _lock.Enter(ref gotLock);
                if (_queue.Count > 0)
                    return _queue.Dequeue();
            }
            finally
            {
                if (gotLock) _lock.Exit(false);
            }
            spin.SpinOnce();
        } while (true);
    }
}

对于性能关键代码,我建议避免使用readonly字段修饰符。它添加了对 IL 中每个字段访问的检查。使用以下测试代码

private static void TestBlockingCollections()
{
    const int workAmount = 10000000;
    var workerCount = Environment.ProcessorCount * 2;
    var sw = new Stopwatch();
    var source = new long[workAmount];
    var rnd = new Random();
    for (int i = 0; i < workAmount; i++)
        source[i] = rnd.Next(1000000);

    var swOverhead = 0.0;
    for (int i = 0; i < workAmount; i++)
    {
        sw.Restart();
        swOverhead += sw.Elapsed.TotalMilliseconds;
    }
    swOverhead /= workAmount;

    var sum1 = new long[workerCount];
    var queue1 = new BlockingCollection<long>(10000);
    var workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        foreach (var l in queue1.GetConsumingEnumerable())
            sum1[n] += l;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue1.Add(l);
    queue1.CompleteAdding();
    Task.WaitAll(workers);
    var elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollection {0:F4}ms", elapsed / workAmount);

    var sum2 = new long[workerCount];
    var queue2 = new BlockingCollectionSlim<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue2.Take()).HasValue)
            sum2[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue2.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue2.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSlim {0:F4}ms", elapsed / workAmount);

    var sum3 = new long[workerCount];
    var queue3 = new BlockingCollectionSpin<long?>();
    workers = Enumerable.Range(0, workerCount - 1).Select(n =>
    Task.Factory.StartNew(() =>
    {
        long? l;
        while ((l = queue3.Take()).HasValue)
            sum3[n] += l.Value;
    })).ToArray();

    Thread.Sleep(1000);

    sw.Restart();
    foreach (var l in source)
        queue3.Add(l);
    for (int i = 0; i < workerCount; i++)
        queue3.Add(null);
    Task.WaitAll(workers);
    elapsed = sw.Elapsed.TotalMilliseconds - swOverhead;
    Console.WriteLine("BlockingCollectionSpin {0:F4}ms", elapsed/workAmount);

    if (sum1.Sum() != sum2.Sum() || sum2.Sum() != sum3.Sum())
        Console.WriteLine("Wrong sum in the end!");

    Console.ReadLine();
}

在具有 2 个内核并启用 HT 的 Core i5-3210M 上,我得到以下输出:

阻塞集合 0.0006ms
BlockingCollectionSlim 0.0010ms(Eugene Beresovsky 实现)
BlockingCollectionSpin 0.0003ms

因此,SpinLocked 版本比 .Net 快两倍BlockingCollection。但是,我建议只使用它!如果您真的更喜欢性能而不是代码简单性(和可维护性)。

于 2019-01-01T13:13:02.593 回答