我需要围绕固定大小的 FIFO 队列实现生产者/消费者模式。我认为围绕 ConcurrentQueue 的包装类可能适用于此,但我不完全确定(而且我以前从未使用过 ConcurrentQueue)。这里的转折是队列只需要保存固定数量的项目(在我的例子中是字符串)。我的应用程序将有一个生产者任务/线程和一个消费者任务/线程。当我的消费者任务运行时,它需要及时将队列中存在的所有项目出列并处理它们。
对于它的价值,我的消费者处理排队的项目只不过是通过 SOAP 将它们上传到不是 100% 可靠的网络应用程序。如果无法建立连接或调用 SOAP 调用失败,我应该丢弃这些项目并返回队列以获取更多信息。由于 SOAP 的开销,我试图最大化队列中可以在一个 SOAP 调用中发送的项目数。
有时,我的生产者添加项目的速度可能比我的消费者删除和处理它们的速度要快。如果队列已经满了并且我的生产者需要添加另一个项目,我需要将新项目排入队列,然后将最旧的项目出队,以便队列的大小保持固定。基本上,我需要始终保留队列中生成的最新项目(即使这意味着某些项目没有被消耗,因为我的消费者当前正在处理以前的项目)。
关于生产者保持队列中的项目数量固定,我从这个问题中发现了一个潜在的想法:
我目前正在使用 Enqueue() 方法围绕 ConcurrentQueue 使用包装类(基于该答案),如下所示:
public class FixedSizeQueue<T>
{
readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
public int Size { get; private set; }
public FixedSizeQueue(int size)
{
Size = size;
}
public void Enqueue(T obj)
{
// add item to the queue
queue.Enqueue(obj);
lock (this) // lock queue so that queue.Count is reliable
{
while (queue.Count > Size) // if queue count > max queue size, then dequeue an item
{
T objOut;
queue.TryDequeue(out objOut);
}
}
}
}
我在队列上创建了一个具有大小限制的类的实例,如下所示:
FixedSizeQueue<string> incomingMessageQueue = new FixedSizeQueue<string>(10); // 10 item limit
我启动了我的生产者任务,它开始填充队列。当添加项目导致队列计数超过最大大小时,我的 Enqueue() 方法中的代码似乎可以正常工作,以从队列中删除最旧的项目。现在我需要我的消费者任务来使项目出列并处理它们,但这是我的大脑感到困惑的地方。为我的消费者实现 Dequeue 方法的最佳方法是什么,该方法将在某个时刻拍摄队列的快照并将所有项目出列以进行处理(在此过程中,生产者可能仍在将项目添加到队列中)?