从 .NET 4.0 开始,您可以使用另外两个(以及 IMO,更清洁)选项。
首先是使用CountdownEvent
类。它避免了必须自己处理递增和递减的需要:
int tasks = <however many tasks you're performing>;
// Dispose when done.
using (var e = new CountdownEvent(tasks))
{
// Queue work.
ThreadPool.QueueUserWorkItem(() => {
// Do work
...
// Signal when done.
e.Signal();
});
// Wait till the countdown reaches zero.
e.Wait();
}
但是,还有一个更强大的解决方案,那就是使用Task
class,如下所示:
// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
// Create task here.
Task.Factory.StartNew(() => {
// Do work.
}
// No signalling, no anything.
).ToArray();
// Wait on all the tasks.
Task.WaitAll(tasks);
使用Task
类和调用WaitAll
更干净,IMO,因为您在整个代码中编织的线程原语更少(注意,没有等待句柄);您不必设置计数器,处理递增/递减,您只需设置您的任务,然后等待它们。这让代码更能表达你想要做什么而不是如何做的原语(至少在管理它的并行化方面)。
.NET 4.5 提供了更多选项,您可以通过调用类上的静态方法Task
来简化实例序列的生成:Run
Task
// The source of your work items, create a sequence of Task instances.
Task[] tasks = Enumerable.Range(0, 100).Select(i =>
// Create task here.
Task.Run(() => {
// Do work.
})
// No signalling, no anything.
).ToArray();
// Wait on all the tasks.
Tasks.WaitAll(tasks);
或者,您可以利用TPL DataFlow 库(它位于System
命名空间中,因此它是官方的,即使它是从 NuGet 下载的,如 Entity Framework)并使用ActionBlock<TInput>
,如下所示:
// Create the action block. Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<object>(o => {
// Do work.
});
// Post 100 times.
foreach (int i in Enumerable.Range(0, 100)) actionBlock.Post(null);
// Signal complete, this doesn't actually stop
// the block, but says that everything is done when the currently
// posted items are completed.
actionBlock.Complete();
// Wait for everything to complete, the Completion property
// exposes a Task which can be waited on.
actionBlock.Completion.Wait();
请注意,ActionBlock<TInput>
默认情况下一次处理一项,因此如果您想让它一次处理多个操作,您必须通过传递ExecutionDataflowBlockOptions
实例并设置MaxDegreeOfParallelism
属性来设置要在构造函数中处理的并发项数:
var actionBlock = new ActionBlock<object>(o => {
// Do work.
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
如果您的操作确实是线程安全的,那么您可以将MaxDegreeOfParallelsim
属性设置为DataFlowBlockOptions.Unbounded
:
var actionBlock = new ActionBlock<object>(o => {
// Do work.
}, new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = DataFlowBlockOptions.Unbounded
});
关键是,您可以细粒度地控制您希望选项的并行程度。
当然,如果您有一系列想要传递到您的ActionBlock<TInput>
实例中的项目,那么您可以链接一个ISourceBlock<TOutput>
实现来提供ActionBlock<TInput>
,如下所示:
// The buffer block.
var buffer = new BufferBlock<int>();
// Create the action block. Since there's not a non-generic
// version, make it object, and pass null to signal, or
// make T the type that takes the input to the action
// and pass that.
var actionBlock = new ActionBlock<int>(o => {
// Do work.
});
// Link the action block to the buffer block.
// NOTE: An IDisposable is returned here, you might want to dispose
// of it, although not totally necessary if everything works, but
// still, good housekeeping.
using (link = buffer.LinkTo(actionBlock,
// Want to propagate completion state to the action block.
new DataflowLinkOptions {
PropagateCompletion = true,
},
// Can filter on items flowing through if you want.
i => true)
{
// Post 100 times to the *buffer*
foreach (int i in Enumerable.Range(0, 100)) buffer.Post(i);
// Signal complete, this doesn't actually stop
// the block, but says that everything is done when the currently
// posted items are completed.
actionBlock.Complete();
// Wait for everything to complete, the Completion property
// exposes a Task which can be waited on.
actionBlock.Completion.Wait();
}
根据您需要做什么,TPL Dataflow 库成为一个更具吸引力的选择,因为它可以处理链接在一起的所有任务的并发性,并且它允许您非常具体地了解您希望每个部分的并行度,同时为每个块保持适当的关注点分离。