6

我需要围绕固定大小的 FIFO 队列实现生产者/消费者模式。我认为围绕 ConcurrentQueue 的包装类可能适用于此,但我不完全确定(而且我以前从未使用过 ConcurrentQueue)。这里的转折是队列只需要保存固定数量的项目(在我的例子中是字符串)。我的应用程序将有一个生产者任务/线程和一个消费者任务/线程。当我的消费者任务运行时,它需要及时将队列中存在的所有项目出列并处理它们。

对于它的价值,我的消费者处理排队的项目只不过是通过 SOAP 将它们上传到不是 100% 可靠的网络应用程序。如果无法建立连接或调用 SOAP 调用失败,我应该丢弃这些项目并返回队列以获取更多信息。由于 SOAP 的开销,我试图最大化队列中可以在一个 SOAP 调用中发送的项目数。

有时,我的生产者添加项目的速度可能比我的消费者删除和处理它们的速度要快。如果队列已经满了并且我的生产者需要添加另一个项目,我需要将新项目排入队列,然后将最旧的项目出队,以便队列的大小保持固定。基本上,我需要始终保留队列中生成的最新项目(即使这意味着某些项目没有被消耗,因为我的消费者当前正在处理以前的项目)。

关于生产者保持队列中的项目数量固定,我从这个问题中发现了一个潜在的想法:

固定大小的队列,在新入队时自动将旧值出列

我目前正在使用 Enqueue() 方法围绕 ConcurrentQueue 使用包装类(基于该答案),如下所示:

public class FixedSizeQueue<T>
{
    readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public int Size { get; private set; }

    public FixedSizeQueue(int size)
    {
        Size = size;
    }

    public void Enqueue(T obj)
    {
        // add item to the queue
        queue.Enqueue(obj);

        lock (this) // lock queue so that queue.Count is reliable
        {
            while (queue.Count > Size) // if queue count > max queue size, then dequeue an item
            {
                T objOut;
                queue.TryDequeue(out objOut);
            }
        }
    }
}

我在队列上创建了一个具有大小限制的类的实例,如下所示:

FixedSizeQueue<string> incomingMessageQueue = new FixedSizeQueue<string>(10); // 10 item limit

我启动了我的生产者任务,它开始填充队列。当添加项目导致队列计数超过最大大小时,我的 Enqueue() 方法中的代码似乎可以正常工作,以从队列中删除最旧的项目。现在我需要我的消费者任务来使项目出列并处理它们,但这是我的大脑感到困惑的地方。为我的消费者实现 Dequeue 方法的最佳方法是什么,该方法将在某个时刻拍摄队列的快照并将所有项目出列以进行处理(在此过程中,生产者可能仍在将项目添加到队列中)?

4

2 回答 2

7

简单地说,ConcurrentQueue 有一个“ToArray”方法,当输入该方法时,它将锁定集合并生成队列中所有当前项目的“快照”。如果您希望为您的消费者提供一个工作块,您可以锁定入队方法具有的同一个对象,调用 ToArray(),然后while(!queue.IsEmpty) queue.TryDequeue(out trash)在返回您提取的数组之前通过循环清除队列。

这将是你的GetAll()方法:

public T[] GetAll()
{
    lock (syncObj) // so that we don't clear items we didn't get with ToArray()
    {
        var result = queue.ToArray();
        T trash;
        while(!queue.IsEmpty) queue.TryDequeue(out trash);
    }
}

由于您必须清除队列,因此您可以简单地将这两个操作结合起来;创建一个适当大小的数组(使用 queue.Count),然后当队列不为空时,将一个项目出队并将其放入数组中,然后返回。

现在,这就是具体问题的答案。我现在必须凭良心戴上我的 CodeReview.SE 帽子并指出一些事情:

  • 永远不要使用lock(this). 您永远不知道其他对象可能会将您的对象用作锁定焦点,因此当对象从内部锁定自身时会被阻止。最好的做法是锁定一个私有范围的对象实例,通常是为了被锁定而创建的:private readonly object syncObj = new object();

  • 由于无论如何您都在锁定包装器的关键部分,因此我将使用普通List<T>集合而不是并发集合。访问速度更快,更容易清理,因此您将能够比 ConcurrentQueue 更简单地完成您正在做的事情。要入队,请在索引零之前锁定同步对象 Insert(),然后使用 RemoveRange() 将索引 Size 中的任何项目删除到列表的当前 Count。要出列,锁定同一个同步对象,调用 myList.ToArray()(来自 Linq 命名空间;与 ConcurrentQueue 的作用几乎相同),然后在返回数组之前调用 myList.Clear()。再简单不过了:

    public class FixedSizeQueue<T>
    {
    private readonly List<T> queue = new List<T>();
    private readonly object syncObj = new object();
    
    public int Size { get; private set; }
    
    public FixedSizeQueue(int size) { Size = size; }
    
    public void Enqueue(T obj)
    {
        lock (syncObj)
        {
            queue.Insert(0,obj)
            if(queue.Count > Size) 
               queue.RemoveRange(Size, Count-Size);
        }
    }
    
    public T[] Dequeue()
    {
        lock (syncObj)
        {
            var result = queue.ToArray();
            queue.Clear();
            return result;
        }
    }
    }
    
  • 您似乎明白您正在使用此模型丢弃排队的项目。这通常不是一件好事,但我愿意给你怀疑的好处。但是,我会说有一种无损方法可以使用 BlockingCollection 来实现这一点。BlockingCollection 包装了任何 IProducerConsumerCollection,包括大多数 System.Collections.Concurrent 类,并允许您指定队列的最大容量。然后,该集合将阻止任何试图从空队列中出列的线程,或任何试图添加到完整队列的线程,直到添加或删除项目,以便有一些东西可以获取或插入空间。这是实现具有最大大小的生产者-消费者队列的最佳方式,或者需要“轮询”以查看是否存在 是供消费者使用的东西。如果你走这条路,只有消费者必须扔掉的东西才会被扔掉;消费者将看到生产者放入的所有行,并对每一行做出自己的决定。

于 2012-09-13T16:55:35.800 回答
5

您不想使用lockwith this。请参阅为什么 lock(this) {...} 不好?更多细节。

这段代码

// if queue count > max queue size, then dequeue an item
while (queue.Count > Size) 
{
    T objOut;
    queue.TryDequeue(out objOut);
}

建议您需要以某种方式等待通知消费者该商品的可用性。在这种情况下,请考虑BlockingCollection<T>改用。

于 2012-09-13T16:43:02.147 回答