7

我需要单生产者、单消费者 FIFO 查询,因为

  • 我需要按收到的顺序处理消息。
  • 我需要异步执行此操作,因为调用者在我处理消息时不应等待。
  • 只有在前一个消息处理完成时才应该开始下一个消息处理。有时“接收”消息的频率高于“处理”消息的频率。但平均而言,我应该能够处理所有消息,只是有时我必须将它们“排队”包。

所以我认为这很像 TCP/IP,你有一个生产者和一个消费者,有时你接收消息的速度比你处理的快,所以你必须查询它们。订单在哪里很重要,而呼叫者对你用这些东西做什么完全不感兴趣。

这听起来很简单,我可能可以使用 general Queue,但我想使用BlockingCollection它,因为我不想用ManualResetEventetc编写任何代码。

有多适合BlockingCollection我的任务,您可能还可以提出其他建议吗?

4

2 回答 2

12

BlockingCollection类实现了 IProducerConsumerCollection接口,非常适合您的要求。

您可以创建两个任务,一个用于异步生产者,另一个作为消费者工作者。前者将添加项目,BlockingCollection而后者只是在新的以 FIFO 顺序可用时立即消耗。

使用TPL 任务BlockingCollection的生产者-消费者示例应用程序:

class ProducerConsumer
{
    private static BlockingCollection<string> queue = new BlockingCollection<string>();

    static void Main(string[] args)
    {
        Start();
    }

    public static void Start()
    {
        var producerWorker = Task.Factory.StartNew(() => RunProducer());
        var consumerWorker = Task.Factory.StartNew(() => RunConsumer());

        Task.WaitAll(producerWorker, consumerWorker);
    }

    private static void RunProducer()
    {
        int itemsCount = 100;

        while (itemsCount-- > 0)
        {
            queue.Add(itemsCount + " - " + Guid.NewGuid().ToString());
            Thread.Sleep(250);
        }
    }

    private static void RunConsumer()
    {
        foreach (var item in queue.GetConsumingEnumerable())
        {
           Console.WriteLine(DateTime.Now.ToString("HH:mm:ss.ffff") + " | " + item);
        }
    }
}

IProducerConsumerCollection

定义用于操作用于生产者/消费者使用的线程安全集合的方法。该接口为生产者/消费者集合提供了统一的表示,以便更高级别的抽象如 System.Collections.Concurrent.BlockingCollection(Of T) 可以将集合用作底层存储机制。

于 2012-04-11T09:36:33.190 回答
0

既然是你需要的队列,为什么不坚持一个队列呢?您可以使用同步队列

于 2012-04-11T09:17:57.140 回答