1

我正在尝试使用 BlockingCollection 实现一个生产者和多个消费者。创造了这样的代码

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace TestConcQueue
{
    class Program
    {
        static void Main()
        {
            Producer producer = new Producer();

            List<Task> tasks = new List<Task>();
            for (int i = 0; i < 10; i++)
            {
                tasks.Add(Task.Factory.StartNew(TaskAction, new object[] {i, producer}));
            }

            while (true)
            {
                string line = Console.ReadLine();
                if (line == "q")
                {
                    producer.Add("x");
                    break;
                }

                producer.Add(line);
            }

            Task.WaitAll(tasks.ToArray());
        }

        private static void TaskAction(object o)
        {
            object[] data = (object[])o;
            int id = (int)data[0];
            Producer producer = (Producer)data[1];

            Console.WriteLine("Task with ID " + id + " started");
            foreach (string value in producer.Consumer)
            {
                Console.WriteLine("Task with ID " + id + " consumed value " + value);
                if (value == "x")
                    break;
            }

            Console.WriteLine("Task with ID " + id + " finished");            
        }
    }


    public class Producer
    {
        private readonly ConcurrentQueue<string> _items = new ConcurrentQueue<string>();
        private readonly BlockingCollection<string> _producer;

        public Producer()
        {
            _producer = new BlockingCollection<string>(_items);
        }

        public void Add(string item)
        {
            _producer.Add(item);
        }

        public IEnumerable<string> Consumer
        {
            get { return _producer.GetConsumingEnumerable(); }
        }
    }
}

但任何时候我添加一个元素只有 1 个任务会消耗该值,所有其他的都缺少它。如何模拟多个消费者。

我不能使用 Rx 或 TPL 库。

谢谢

4

0 回答 0