166

我有一个场景,我有多个线程添加到一个队列中,并且多个线程从同一个队列中读取。如果队列达到特定大小,所有填充队列的线程将在添加时被阻塞,直到从队列中删除项目。

下面的解决方案是我现在正在使用的,我的问题是:如何改进?是否有一个对象已经在我应该使用的 BCL 中启用了这种行为?

internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
    //todo: might be worth changing this into a proper QUEUE

    private AutoResetEvent _FullEvent = new AutoResetEvent(false);

    internal T this[int i]
    {
        get { return (T) List[i]; }
    }

    private int _MaxSize;
    internal int MaxSize
    {
        get { return _MaxSize; }
        set
        {
            _MaxSize = value;
            checkSize();
        }
    }

    internal BlockingCollection(int maxSize)
    {
        MaxSize = maxSize;
    }

    internal void Add(T item)
    {
        Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));

        _FullEvent.WaitOne();

        List.Add(item);

        Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));

        checkSize();
    }

    internal void Remove(T item)
    {
        lock (List)
        {
            List.Remove(item);
        }

        Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
    }

    protected override void OnRemoveComplete(int index, object value)
    {
        checkSize();
        base.OnRemoveComplete(index, value);
    }

    internal new IEnumerator GetEnumerator()
    {
        return List.GetEnumerator();
    }

    private void checkSize()
    {
        if (Count < MaxSize)
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Set();
        }
        else
        {
            Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
            _FullEvent.Reset();
        }
    }
}
4

10 回答 10

206

这看起来很不安全(很少同步);怎么样:

class SizeQueue<T>
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int maxSize;
    public SizeQueue(int maxSize) { this.maxSize = maxSize; }

    public void Enqueue(T item)
    {
        lock (queue)
        {
            while (queue.Count >= maxSize)
            {
                Monitor.Wait(queue);
            }
            queue.Enqueue(item);
            if (queue.Count == 1)
            {
                // wake up any blocked dequeue
                Monitor.PulseAll(queue);
            }
        }
    }
    public T Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            T item = queue.Dequeue();
            if (queue.Count == maxSize - 1)
            {
                // wake up any blocked enqueue
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }
}

(编辑)

实际上,您需要一种关闭队列的方法,以便读者开始干净地退出 - 可能类似于 bool 标志 - 如果设置,空队列只会返回(而不是阻塞):

bool closing;
public void Close()
{
    lock(queue)
    {
        closing = true;
        Monitor.PulseAll(queue);
    }
}
public bool TryDequeue(out T value)
{
    lock (queue)
    {
        while (queue.Count == 0)
        {
            if (closing)
            {
                value = default(T);
                return false;
            }
            Monitor.Wait(queue);
        }
        value = queue.Dequeue();
        if (queue.Count == maxSize - 1)
        {
            // wake up any blocked enqueue
            Monitor.PulseAll(queue);
        }
        return true;
    }
}
于 2009-02-09T22:05:20.590 回答
60

使用 .net 4 BlockingCollection,入队使用 Add(),出队使用 Take()。它在内部使用非阻塞 ConcurrentQueue。更多信息在这里快速和最佳生产者/消费者队列技术 BlockingCollection 与并发队列

于 2011-12-09T15:22:55.397 回答
14

“这要怎么改进?”

好吧,您需要查看类中的每个方法,并考虑如果另一个线程同时调用该方法或任何其他方法会发生什么。例如,您在 Remove 方法中放置了一个锁,但没有在 Add 方法中。如果一个线程在添加另一个线程的同时删除会发生什么?坏事。

还要考虑一个方法可以返回第二个对象,该对象提供对第一个对象的内部数据的访问 - 例如,GetEnumerator。想象一个线程正在通过该枚举器,另一个线程同时正在修改列表。不好。

一个好的经验法则是通过将类中的方法数量减少到绝对最小值来简化此操作。

特别是,不要继承另一个容器类,因为您将公开该类的所有方法,为调用者提供一种破坏内部数据或查看数据的部分完整更改的方法(同样糟糕,因为数据在那一刻似乎已损坏)。隐藏所有详细信息,并对您如何允许访问它们完全无情。

我强烈建议您使用现成的解决方案 - 获取有关线程的书或使用 3rd 方库。否则,鉴于您正在尝试的内容,您将长时间调试代码。

此外,Remove 返回一个项目(例如,第一个添加的项目,因为它是一个队列)而不是调用者选择特定项目不是更有意义吗?当队列为空时,也许 Remove 也应该阻塞。

更新:马克的回答实际上实现了所有这些建议!:) 但我将把它留在这里,因为它可能有助于理解为什么他的版本如此改进。

于 2009-02-09T22:51:03.727 回答
13

您可以在 System.Collections.Concurrent 命名空间中使用BlockingCollectionConcurrentQueue

 public class ProducerConsumerQueue<T> : BlockingCollection<T>
{
    /// <summary>
    /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
    /// </summary>
    public ProducerConsumerQueue()  
        : base(new ConcurrentQueue<T>())
    {
    }

  /// <summary>
  /// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
  /// </summary>
  /// <param name="maxSize"></param>
    public ProducerConsumerQueue(int maxSize)
        : base(new ConcurrentQueue<T>(), maxSize)
    {
    }



}
于 2013-02-06T10:39:38.103 回答
6

我刚刚使用 Reactive Extensions 解决了这个问题并记住了这个问题:

public class BlockingQueue<T>
{
    private readonly Subject<T> _queue;
    private readonly IEnumerator<T> _enumerator;
    private readonly object _sync = new object();

    public BlockingQueue()
    {
        _queue = new Subject<T>();
        _enumerator = _queue.GetEnumerator();
    }

    public void Enqueue(T item)
    {
        lock (_sync)
        {
            _queue.OnNext(item);
        }
    }

    public T Dequeue()
    {
        _enumerator.MoveNext();
        return _enumerator.Current;
    }
}

不一定完全安全,但非常简单。

于 2010-05-07T14:14:36.027 回答
5

这就是我为线程安全的有界阻塞队列而来的操作。

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

public class BlockingBuffer<T>
{
    private Object t_lock;
    private Semaphore sema_NotEmpty;
    private Semaphore sema_NotFull;
    private T[] buf;

    private int getFromIndex;
    private int putToIndex;
    private int size;
    private int numItems;

    public BlockingBuffer(int Capacity)
    {
        if (Capacity <= 0)
            throw new ArgumentOutOfRangeException("Capacity must be larger than 0");

        t_lock = new Object();
        buf = new T[Capacity];
        sema_NotEmpty = new Semaphore(0, Capacity);
        sema_NotFull = new Semaphore(Capacity, Capacity);
        getFromIndex = 0;
        putToIndex = 0;
        size = Capacity;
        numItems = 0;
    }

    public void put(T item)
    {
        sema_NotFull.WaitOne();
        lock (t_lock)
        {
            while (numItems == size)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            buf[putToIndex++] = item;

            if (putToIndex == size)
                putToIndex = 0;

            numItems++;

            Monitor.Pulse(t_lock);

        }
        sema_NotEmpty.Release();


    }

    public T take()
    {
        T item;

        sema_NotEmpty.WaitOne();
        lock (t_lock)
        {

            while (numItems == 0)
            {
                Monitor.Pulse(t_lock);
                Monitor.Wait(t_lock);
            }

            item = buf[getFromIndex++];

            if (getFromIndex == size)
                getFromIndex = 0;

            numItems--;

            Monitor.Pulse(t_lock);

        }
        sema_NotFull.Release();

        return item;
    }
}
于 2009-05-28T12:55:36.713 回答
2

我还没有完全探索过TPL,但他们可能有一些适合你需要的东西,或者至少有一些 Reflector 素材可以从中获得一些灵感。

希望有帮助。

于 2009-02-09T22:11:23.913 回答
1

从 .NET 5.0/Core 3.0 开始,您可以使用System.Threading.Channels这篇文章(.NET (C#) 中的异步生产者消费者模式)中的
基准测试显示,与 BlockingCollection 相比,速度显着提升!

于 2020-11-19T16:30:17.017 回答
0

好吧,你可以看看System.Threading.Semaphore课堂。除此之外 - 不,你必须自己做。AFAIK 没有这样的内置集合。

于 2009-02-09T22:05:22.133 回答
-1

如果你想要最大的吞吐量,允许多个读者阅读并且只有一个作者可以写,BCL 有一个叫做 ReaderWriterLockSlim 的东西,它可以帮助你精简你的代码......

于 2009-02-09T22:04:32.213 回答