我知道使用 ConcurrentQueue 的 BlockingCollection 的有界容量为 100。
但是我不确定这意味着什么。
我正在尝试实现一个并发缓存,如果队列大小太大,它可以出队,可以在一个操作中出队/入队(即缓存溢出时松散的消息)。有没有办法为此使用有限容量,或者手动执行此操作或创建新集合更好。
基本上我有一个阅读线程和几个写作线程。如果队列中的数据是所有作者中“最新的”,我会很高兴。
我知道使用 ConcurrentQueue 的 BlockingCollection 的有界容量为 100。
但是我不确定这意味着什么。
我正在尝试实现一个并发缓存,如果队列大小太大,它可以出队,可以在一个操作中出队/入队(即缓存溢出时松散的消息)。有没有办法为此使用有限容量,或者手动执行此操作或创建新集合更好。
基本上我有一个阅读线程和几个写作线程。如果队列中的数据是所有作者中“最新的”,我会很高兴。
N 的有限容量意味着如果队列已经包含 N 个项目,则任何尝试添加另一个项目的线程都将阻塞,直到另一个线程删除一个项目。
您似乎想要的是一个不同的概念-您希望最近添加的项目成为消费线程出列的第一个项目。
您可以通过对底层存储使用 aConcurrentStack
而不是 ConcurrentQueue 来实现这一点。
您将使用this constructor
并传入一个ConcurrentStack
.
例如:
var blockingCollection = new BlockingCollection<int>(new ConcurrentStack<int>());
通过使用ConcurrentStack
,您可以确保消费线程出列的每个项目都是当时队列中最新鲜的项目。
另请注意,如果您为阻塞集合指定上限,则可以使用BlockingCollection.TryAdd()
which 将false
在您调用它时集合已满时返回。
在我看来,您正在尝试构建诸如 MRU(最近使用的)缓存之类的东西。BlockingCollection
不是最好的方法。
我建议您使用LinkedList。它不是线程安全的,因此您必须提供自己的同步,但这并不太难。您的入队方法如下所示:
LinkedList<MyType> TheQueue = new LinkedList<MyType>();
object listLock = new object();
void Enqueue(MyType item)
{
lock (listLock)
{
TheQueue.AddFirst(item);
while (TheQueue.Count > MaxQueueSize)
{
// Queue overflow. Reduce to max size.
TheQueue.RemoveLast();
}
}
}
出队更容易:
MyType Dequeue()
{
lock (listLock)
{
return (TheQueue.Count > 0) ? TheQueue.RemoveLast() : null;
}
}
如果您希望消费者在队列上进行非忙碌等待,则涉及更多。你可以用Monitor.Wait
和来做Monitor.Pulse
。有关示例,请参见Monitor.Pulse页面上的示例。
更新:
我突然想到你可以用循环缓冲区(数组)做同样的事情。只需保持头部和尾部指针。您在 at 插入head
和删除tail
。如果你去插入,和head == tail
,那么你需要增加tail
,这有效地删除了前tail
一项。
如果您想要一个BlockingCollection
包含 N 个最新元素的自定义,并在其已满时删除最旧的元素,您可以基于Channel<T>
. Channels旨在用于异步场景,但让它们阻塞消费者是微不足道的,并且不应该导致任何不需要的副作用(如死锁),即使在SynchronizationContext
已安装的环境中使用也是如此。
public class MostRecentBlockingCollection<T>
{
private readonly Channel<T> _channel;
public MostRecentBlockingCollection(int capacity)
{
_channel = Channel.CreateBounded<T>(new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.DropOldest,
});
}
public bool IsCompleted => _channel.Reader.Completion.IsCompleted;
public void Add(T item)
=> _channel.Writer.WriteAsync(item).AsTask().GetAwaiter().GetResult();
public T Take()
=> _channel.Reader.ReadAsync().AsTask().GetAwaiter().GetResult();
public void CompleteAdding() => _channel.Writer.Complete();
public IEnumerable<T> GetConsumingEnumerable()
{
while (_channel.Reader.WaitToReadAsync().AsTask().GetAwaiter().GetResult())
while (_channel.Reader.TryRead(out var item))
yield return item;
}
}
该类MostRecentBlockingCollection
仅阻止消费者。生产者总是可以在集合中添加项目,导致(可能)一些以前添加的元素被删除。
添加取消支持应该很简单,因为Channel<T>
API 已经支持它。添加对超时的支持不那么简单,但应该不是很难做到。