0

我想要一种队列,其中单个源在其中输入数据,另一方面,消费者将等待,当他们检测到队列不为空时将开始执行数据,直到它们停止。但重要的是,如果队列被清空,他们仍将继续观察队列,以便如果有更多数据弹出,他们将能够使用它。我发现了多个消费者和多个生产者,因为消费者嵌套在生产者中,在我的情况下我不能这样做,因为我将有一个单一的来源和消费者承诺到队列中,直到我停止它们。因此不是串联,但消费者和生产者都是并行执行的。

将通过以下方式并行执行消费者和生产者

Parallel.Invoke(() => producer(), () => consumers());

这样的问题是我将如何执行有时并行为空的队列的内容

4

1 回答 1

0

您可以使用BlockingCollection<T>.

您可以将其中一个用作队列,并将对它的引用传递给producer()和每个consumers().

GetConsumingEnumerable()将从每个消费者线程调用,并将其与foreach.

生产者线程会将项目添加到集合中,并CompleteAdding()在完成生产时调用。这将自动使所有消费者线程退出它们的 foreach 循环。

这是一个基本示例(没有错误处理)。调用Thread.Sleep()是为了模拟负载,不应在实际代码中使用。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            ThreadPool.SetMinThreads(10, 0); // To help the demo; not needed in real code.
            var plant = new ProcessingPlant();
            plant.Process();
            Console.WriteLine("Work complete.");
        }
    }

    public sealed class ProcessingPlant
    {
        private readonly BlockingCollection<string> _queue = new BlockingCollection<string>();

        public void Process()
        {
            Parallel.Invoke(producer, consumers);
        }

        private void producer()
        {
            for (int i = 0; i < 100; ++i)
            {
                string item = i.ToString();
                Console.WriteLine("Producer is queueing {0}", item);
               _queue.Add(item);  // <- Here's where we add an item to the queue.
                Thread.Sleep(0);
            }

            _queue.CompleteAdding(); // <- Here's where we make all the consumers
        }                            //    exit their foreach loops.

        private void consumers()
        {
            Parallel.Invoke(
                () => consumer(1),
                () => consumer(2),
                () => consumer(3),
                () => consumer(4),
                () => consumer(5)
            );
        }

        private void consumer(int id)
        {
            Console.WriteLine("Consumer {0} is starting.", id);

            foreach (var item in _queue.GetConsumingEnumerable()) // <- Here's where we remove items.
            {
                Console.WriteLine("Consumer {0} read {1}", id, item);
                Thread.Sleep(0);
            }

            Console.WriteLine("Consumer {0} is stopping.", id);
        }
    }
}

(我知道这是使用一个额外的线程来启动消费者,但我这样做是为了避免掩盖真正的观点 - 这是为了演示 BlockingCollection 的使用。)

于 2013-03-30T00:57:29.437 回答