这可以通过TPL 数据流库轻松实现。
首先,假设您有一个BufferBlock<T>
,这是您的队列:
var queue = new BufferBlock<T>();
然后,您需要对块执行的操作,这由ActionBlock<T>
类表示:
var action = new ActionBlock<T>(t => { /* Process t here */ },
new ExecutionDataflowBlockOptions {
// Number of concurrent tasks.
MaxDegreeOfParallelism = ...,
});
请注意上面的构造函数,它需要一个实例ExecutionDataflowBlockOptions
并将MaxDegreeOfParallelism
属性设置为您希望同时处理的多个并发项目。
在表面之下,Task Parallel Library 用于处理为任务分配线程等。TPL Dataflow 旨在成为更高级别的抽象,它允许您调整所需的并行度/节流/等。
例如,如果您不希望缓冲任何项目ActionBlock<TInput>
(希望它们存在于 中)BufferBlock<T>
,您还可以设置BoundedCapacity
属性ActionBlock<TInput>
正在处理的项目,以及保留的项目):
var action = new ActionBlock<T>(t => { /* Process t here */ },
new ExecutionDataflowBlockOptions {
// Number of concurrent tasks.
MaxDegreeOfParallelism = ...,
// Set to MaxDegreeOfParallelism to not buffer.
BoundedCapacity ...,
});
此外,如果您想要一个新的、新鲜的Task<TResult>
实例来处理每个项目,那么您可以将MaxMessagesPerTask
属性设置为一个,表示每个都Task<TResult>
将处理一个项目:
var action = new ActionBlock<T>(t => { /* Process t here */ },
new ExecutionDataflowBlockOptions {
// Number of concurrent tasks.
MaxDegreeOfParallelism = ...,
// Set to MaxDegreeOfParallelism to not buffer.
BoundedCapacity ...,
// Process once item per task.
MaxMessagesPerTask = 1,
});
请注意,根据您的应用程序正在运行的其他任务数量,这可能对您来说是最佳的,也可能不是最佳的,您可能还需要考虑为通过ActionBlock<TInput>
.
从那里开始,通过调用方法BufferBlock<T>
将 链接到是一件简单的事情:ActionBlock<TInput>
LinkTo
IDisposable connection = queue.LinkTo(action, new DataflowLinkOptions {
PropagateCompletion = true;
});
您在此处将该PropogateCompletion
属性设置为 true ,以便在等待 时ActionBlock<T>
,将完成发送到ActionBlock<T>
您可能随后等待的(如果/当没有更多项目要处理时)。
请注意,如果您希望删除块之间的链接,您可以在调用返回的接口实现上调用该Dispose
方法。IDisposable
LinkTo
Post
最后,您使用以下方法将项目发布到缓冲区:
queue.Post(new T());
当你完成(如果你曾经完成),你调用Complete
方法:
queue.Complete();
然后,在操作块上,您可以通过等待属性Task
公开的实例等到它完成:Completion
action.Completion.Wait();
希望它的优雅是显而易见的:
- 您不必管理新
Task
实例/线程/等的创建来管理工作,这些块会根据您提供的设置为您完成(这是基于每个块的)。
- 更清晰的关注点分离。缓冲区与动作分离,所有其他块也是如此。您构建块,然后将它们链接在一起。