我正在尝试使用 BlockingCollection 实现一个生产者和多个消费者。创造了这样的代码
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace TestConcQueue
{
class Program
{
static void Main()
{
Producer producer = new Producer();
List<Task> tasks = new List<Task>();
for (int i = 0; i < 10; i++)
{
tasks.Add(Task.Factory.StartNew(TaskAction, new object[] {i, producer}));
}
while (true)
{
string line = Console.ReadLine();
if (line == "q")
{
producer.Add("x");
break;
}
producer.Add(line);
}
Task.WaitAll(tasks.ToArray());
}
private static void TaskAction(object o)
{
object[] data = (object[])o;
int id = (int)data[0];
Producer producer = (Producer)data[1];
Console.WriteLine("Task with ID " + id + " started");
foreach (string value in producer.Consumer)
{
Console.WriteLine("Task with ID " + id + " consumed value " + value);
if (value == "x")
break;
}
Console.WriteLine("Task with ID " + id + " finished");
}
}
public class Producer
{
private readonly ConcurrentQueue<string> _items = new ConcurrentQueue<string>();
private readonly BlockingCollection<string> _producer;
public Producer()
{
_producer = new BlockingCollection<string>(_items);
}
public void Add(string item)
{
_producer.Add(item);
}
public IEnumerable<string> Consumer
{
get { return _producer.GetConsumingEnumerable(); }
}
}
}
但任何时候我添加一个元素只有 1 个任务会消耗该值,所有其他的都缺少它。如何模拟多个消费者。
我不能使用 Rx 或 TPL 库。
谢谢