这是一个BlockingCollection
用于管理简单工作队列的示例。
当工作线程完成当前项目时,它将从工作队列中删除一个新项目,处理该项目,然后将其添加到输出队列。
一个单独的消费者线程从输出队列中删除已完成的项目并对其进行处理。
最后,我们必须等待所有工作人员完成(Task.WaitAll(workers)
),然后才能将输出队列标记为已完成(outputQueue.CompleteAdding()
)。
这个例子只有工作项的整数;在实际代码中,您将使用封装工作的对象。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
class Program
{
static void Main(string[] args)
{
new Program().run();
}
void run()
{
int threadCount = 4;
Task[] workers = new Task[threadCount];
Task.Factory.StartNew(consumer);
for (int i = 0; i < threadCount; ++i)
{
int workerId = i;
Task task = new Task(() => worker(workerId));
workers[i] = task;
task.Start();
}
for (int i = 0; i < 100; ++i)
{
Console.WriteLine("Queueing work item {0}", i);
inputQueue.Add(i);
Thread.Sleep(50);
}
Console.WriteLine("Stopping adding.");
inputQueue.CompleteAdding();
Task.WaitAll(workers);
outputQueue.CompleteAdding();
Console.WriteLine("Done.");
Console.ReadLine();
}
void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);
foreach (var workItem in inputQueue.GetConsumingEnumerable())
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem);
Thread.Sleep(100); // Simulate work.
outputQueue.Add(workItem); // Output completed item.
}
Console.WriteLine("Worker {0} is stopping.", workerId);
}
void consumer()
{
Console.WriteLine("Consumer is starting.");
foreach (var workItem in outputQueue.GetConsumingEnumerable())
{
Console.WriteLine("Consumer is using item {0}", workItem);
Thread.Sleep(25);
}
Console.WriteLine("Consumer is finished.");
}
BlockingCollection<int> inputQueue = new BlockingCollection<int>();
BlockingCollection<int> outputQueue = new BlockingCollection<int>();
}
}
Plinq 和数据流
您还应该查看 Plinq,并且 - 如果您可以使用 .Net 4.5 - 还有Dataflow (Task Parallel Library)