6

我知道使用 ConcurrentQueue 的 BlockingCollection 的有界容量为 100。

但是我不确定这意味着什么。

我正在尝试实现一个并发缓存,如果队列大小太大,它可以出队,可以在一个操作中出队/入队(即缓存溢出时松散的消息)。有没有办法为此使用有限容量,或者手动执行此操作或创建新集合更好。

基本上我有一个阅读线程和几个写作线程。如果队列中的数据是所有作者中“最新的”,我会很高兴。

4

3 回答 3

3

N 的有限容量意味着如果队列已经包含 N 个项目,则任何尝试添加另一个项目的线程都将阻塞,直到另一个线程删除一个项目。

您似乎想要的是一个不同的概念-您希望最近添加的项目成为消费线程出列的第一个项目。

您可以通过对底层存储使用 aConcurrentStack而不是 ConcurrentQueue 来实现这一点。

您将使用this constructor并传入一个ConcurrentStack.

例如:

var blockingCollection = new BlockingCollection<int>(new ConcurrentStack<int>());

通过使用ConcurrentStack,您可以确保消费线程出列的每个项目都是当时队列中最新鲜的项目。

另请注意,如果您为阻塞集合指定上限,则可以使用BlockingCollection.TryAdd()which 将false在您调用它时集合已满时返回。

于 2013-04-10T14:44:49.413 回答
2

在我看来,您正在尝试构建诸如 MRU(最近使用的)缓存之类的东西。BlockingCollection不是最好的方法。

我建议您使用LinkedList。它不是线程安全的,因此您必须提供自己的同步,但这并不太难。您的入队方法如下所示:

LinkedList<MyType> TheQueue = new LinkedList<MyType>();
object listLock = new object();

void Enqueue(MyType item)
{
    lock (listLock)
    {
        TheQueue.AddFirst(item);
        while (TheQueue.Count > MaxQueueSize)
        {
            // Queue overflow. Reduce to max size.
            TheQueue.RemoveLast();
        }
    }
}

出队更容易:

MyType Dequeue()
{
    lock (listLock)
    {
        return (TheQueue.Count > 0) ? TheQueue.RemoveLast() : null;
    }
}

如果您希望消费者在队列上进行非忙碌等待,则涉及更多。你可以用Monitor.Wait和来做Monitor.Pulse。有关示例,请参见Monitor.Pulse页面上的示例。

更新:

我突然想到你可以用循环缓冲区(数组)做同样的事情。只需保持头部和尾部指针。您在 at 插入head和删除tail。如果你去插入,和head == tail,那么你需要增加tail,这有效地删除了前tail一项。

于 2013-04-10T17:53:01.357 回答
2

如果您想要一个BlockingCollection包含 N 个最新元素的自定义,并在其已满时删除最旧的元素,您可以基于Channel<T>. Channels旨在用于异步场景,但让它们阻塞消费者是微不足道的,并且不应该导致任何不需要的副作用(如死锁),即使在SynchronizationContext已安装的环境中使用也是如此。

public class MostRecentBlockingCollection<T>
{
    private readonly Channel<T> _channel;

    public MostRecentBlockingCollection(int capacity)
    {
        _channel = Channel.CreateBounded<T>(new BoundedChannelOptions(capacity)
        {
            FullMode = BoundedChannelFullMode.DropOldest,
        });
    }

    public bool IsCompleted => _channel.Reader.Completion.IsCompleted;

    public void Add(T item)
        => _channel.Writer.WriteAsync(item).AsTask().GetAwaiter().GetResult();

    public T Take()
        => _channel.Reader.ReadAsync().AsTask().GetAwaiter().GetResult();

    public void CompleteAdding() => _channel.Writer.Complete();

    public IEnumerable<T> GetConsumingEnumerable()
    {
        while (_channel.Reader.WaitToReadAsync().AsTask().GetAwaiter().GetResult())
            while (_channel.Reader.TryRead(out var item))
                yield return item;
    }
}

该类MostRecentBlockingCollection仅阻止消费者。生产者总是可以在集合中添加项目,导致(可能)一些以前添加的元素被删除。

添加取消支持应该很简单,因为Channel<T>API 已经支持它。添加对超时的支持不那么简单,但应该不是很难做到。

于 2020-12-31T03:46:56.547 回答