编辑:原来我错了。TransformBlock
确实以它们进入的顺序返回项目,即使它被配置为并行。因此,我原始答案中的代码完全没用,TransformBlock
可以使用正常代替。
原答案:
据我所知,.Net 中只有一个并行结构支持按照它们进入的顺序返回已处理的项目:PLINQ with AsOrdered()
. 但在我看来,PLINQ 并不适合你想要的。
另一方面,我认为 TPL Dataflow 非常适合,但它没有一个块可以支持并行性和同时按顺序返回项目(TransformBlock
支持它们两者,但不是同时)。幸运的是,Dataflow 块的设计考虑了可组合性,因此我们可以构建自己的块来实现这一点。
但首先,我们必须弄清楚如何对结果进行排序。像您建议的那样,使用并发字典以及一些同步机制肯定会起作用。但我认为有一个更简单的解决方案:使用Task
s 的队列。在输出任务中,您将 a 出列Task
,等待它完成(异步),当它完成时,您将其结果一起发送。当队列为空时,我们仍然需要一些同步,但是如果我们巧妙地选择使用哪个队列,我们可以免费获得。
所以,一般的想法是这样的:我们写的是一个IPropagatorBlock
,有一些输入和一些输出。创建自定义的最简单方法IPropagatorBlock
是创建一个处理输入的块,另一个块产生结果并将它们视为一个使用DataflowBlock.Encapsulate()
.
输入块必须以正确的顺序处理传入的项目,因此那里没有并行化。它将创建一个新的Task
(实际上是 a TaskCompletionSource
,以便我们可以设置Task
后者的结果),将其添加到队列中,然后发送该项目进行处理,以及一些设置正确结果的方法Task
。因为我们不需要将此块链接到任何东西,所以我们可以使用ActionBlock
.
输出块必须Task
从队列中取出 s,异步等待它们,然后将它们一起发送。但是由于所有块都嵌入了一个队列,并且接受委托的块具有内置的异步等待,这将非常简单:new TransformBlock<Task<TOutput>, TOutput>(t => t)
. 该块将作为队列和输出块工作。因此,我们不必处理任何同步。
最后一块拼图实际上是并行处理项目。为此,我们可以使用另一个ActionBlock
,这次使用MaxDegreeOfParallelism
set。它将接受输入,处理它,并Task
在队列中设置正确的结果。
放在一起,它可能看起来像这样:
public static IPropagatorBlock<TInput, TOutput>
CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
Func<TInput, TOutput> transform)
{
var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);
var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
tuple => tuple.Item2(transform(tuple.Item1)),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var enqueuer = new ActionBlock<TInput>(
async item =>
{
var tcs = new TaskCompletionSource<TOutput>();
await processor.SendAsync(
new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
await queue.SendAsync(tcs.Task);
});
enqueuer.Completion.ContinueWith(
_ =>
{
queue.Complete();
processor.Complete();
});
return DataflowBlock.Encapsulate(enqueuer, queue);
}
经过这么多的讨论,我认为这是相当少量的代码。
看来您非常关心性能,因此您可能需要微调此代码。例如,MaxDegreeOfParallelism
将processor
块设置为类似的东西可能是有意义的Environment.ProcessorCount
,以避免超额订阅。此外,如果延迟对您来说比吞吐量更重要,那么将相同的块设置为 1(或另一个小数字)可能是有意义MaxMessagesPerTask
的,这样当一个项目的处理完成时,它会立即发送到输出。
此外,如果你想限制传入的项目,你可以设置BoundedCapacity
.enqueuer