12

我在不同的任务上运行了一个非常典型的生产者/消费者模型。

Task1:从二进制文件中读取批量字节[],并为每个字节数组集合启动一个新任务。(出于内存管理目的,该操作是批处理的)。

任务 2-n:这些是工作任务,每个都对传入的字节数组集合(来自 Tasks1)进行操作,并对字节数组进行反序列化,按特定标准对其进行排序,然后存储结果对象的集合(每个字节数组反序列化为此类对象)在并发字典中。

任务 (n+1) 我选择了并发字典,因为此任务的工作是合并存储在并发字典中的那些集合,其顺序与它们源自 Task1 的顺序相同。我通过从 Task1 一直向下传递到此任务的 collectionID(它是 int 类型并为 Task1 中的每个新集合递增)来实现这一点。该任务主要检查下一个预期的 collectionID 是否已经存储在并发字典中,如果是,则将其取出,将其添加到最终队列并检查并发字典中的下一个集合。

现在,从我阅读的内容和观看的视频来看,在我看来,TPL Dataflow 可能是这种生产者/消费者模型的完美候选者。我只是似乎无法设计一个设计并因此开始,因为我从未使用过 TPL Dataflow。就吞吐量和延迟而言,这个库是否能胜任这项任务?我目前在结果集合中每秒处理 250 万字节数组和对象。TPL 数据流可以帮助简化吗?我对以下问题的答案特别感兴趣:TPL Dataflow 在生成工作任务并在工作任务完成后重新合并它们时,是否可以保留来自 Task1 的收集批次的顺序?它会优化事物吗?在剖析了整个结构之后,我觉得由于旋转和涉及太多并发集合而浪费了相当多的时间。

有什么想法、想法吗?

4

2 回答 2

11

编辑:原来我错了。TransformBlock 确实以它们进入的顺序返回项目,即使它被配置为并行。因此,我原始答案中的代码完全没用,TransformBlock可以使用正常代替。


原答案:

据我所知,.Net 中只有一个并行结构支持按照它们进入的顺序返回已处理的项目:PLINQ with AsOrdered(). 但在我看来,PLINQ 并不适合你想要的。

另一方面,我认为 TPL Dataflow 非常适合,但它没有一个块可以支持并行性和同时按顺序返回项目(TransformBlock支持它们两者,但不是同时)。幸运的是,Dataflow 块的设计考虑了可组合性,因此我们可以构建自己的块来实现这一点。

但首先,我们必须弄清楚如何对结果进行排序。像您建议的那样,使用并发字典以及一些同步机制肯定会起作用。但我认为有一个更简单的解决方案:使用Tasks 的队列。在输出任务中,您将 a 出列Task,等待它完成(异步),当它完成时,您将其结果一起发送。当队列为空时,我们仍然需要一些同步,但是如果我们巧妙地选择使用哪个队列,我们​​可以免费获得。

所以,一般的想法是这样的:我们写的是一个IPropagatorBlock,有一些输入和一些输出。创建自定义的最简单方法IPropagatorBlock是创建一个处理输入的块,另一个块产生结果并将它们视为一个使用DataflowBlock.Encapsulate().

输入块必须以正确的顺序处理传入的项目,因此那里没有并行化。它将创建一个新的Task(实际上是 a TaskCompletionSource,以便我们可以设置Task后者的结果),将其添加到队列中,然后发送该项目进行处理,以及一些设置正确结果的方法Task。因为我们不需要将此块链接到任何东西,所以我们可以使用ActionBlock.

输出块必须Task从队列中取出 s,异步等待它们,然后将它们一起发送。但是由于所有块都嵌入了一个队列,并且接受委托的块具有内置的异步等待,这将非常简单:new TransformBlock<Task<TOutput>, TOutput>(t => t). 该块将作为队列和输出块工作。因此,我们不必处理任何同步。

最后一块拼图实际上是并行处理项目。为此,我们可以使用另一个ActionBlock,这次使用MaxDegreeOfParallelismset。它将接受输入,处理它,并Task在队列中设置正确的结果。

放在一起,它可能看起来像这样:

public static IPropagatorBlock<TInput, TOutput>
    CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform)
{
    var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);

    var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
        tuple => tuple.Item2(transform(tuple.Item1)),
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

    var enqueuer = new ActionBlock<TInput>(
        async item =>
        {
            var tcs = new TaskCompletionSource<TOutput>();
            await processor.SendAsync(
                new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
            await queue.SendAsync(tcs.Task);
        });

    enqueuer.Completion.ContinueWith(
        _ =>
        {
            queue.Complete();
            processor.Complete();
        });

    return DataflowBlock.Encapsulate(enqueuer, queue);
}

经过这么多的讨论,我认为这是相当少量的代码。

看来您非常关心性能,因此您可能需要微调此代码。例如,MaxDegreeOfParallelismprocessor块设置为类似的东西可能是有意义的Environment.ProcessorCount,以避免超额订阅。此外,如果延迟对您来说比吞吐量更重要,那么将相同的块设置为 1(或另一个小数字)可能是有意义MaxMessagesPerTask的,这样当一个项目的处理完成时,它会立即发送到输出。

此外,如果你想限制传入的项目,你可以设置BoundedCapacity.enqueuer

于 2012-06-15T17:46:02.320 回答
0

是的,TPL 数据流库非常适合这项工作。它支持您需要的所有功能:MaxDegreeOfParallelism和。但是使用该选项需要注意细节。BoundedCapacityEnsureOrderedBoundedCapacity

首先,您必须确保使用该SendAsync方法提供管道中的第一个块。否则,如果您使用该Post方法并忽略其返回值,您可能会丢失消息。永远不会丢失消息,因为它SendAsync会异步阻塞调用者,直到块的内部缓冲区中有传入消息的可用空间。

其次,您必须确保下游块中可能出现的异常不会无限期地阻塞馈线,等待永远不会到来的可用空间。没有内置的方法可以通过配置块来自动实现这一点。相反,您必须手动将下游块的完成传播到上游块。这是PropagateFailure以下示例中方法的意图:

public static async Task ProcessAsync(string[] filePaths,
    ConcurrentQueue<MyClass> finalQueue)
{
    var reader = new TransformBlock<string, byte[]>(filePath =>
    {
        byte[] result = ReadBinaryFile(filePath);
        return result;
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 1, // This is the default
        BoundedCapacity = 20, // keep memory usage under control
        EnsureOrdered = true // This is also the default
    });

    var deserializer = new TransformBlock<byte[], MyClass>(bytes =>
    {
        MyClass result = Deserialize(bytes);
        return result;
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = Environment.ProcessorCount,
        BoundedCapacity = 20
    });

    var writer = new ActionBlock<MyClass>(obj =>
    {
        finalQueue.Enqueue(obj);
    });

    reader.LinkTo(deserializer,
        new DataflowLinkOptions() { PropagateCompletion = true });
    PropagateFailure(deserializer, reader); // Link backwards

    deserializer.LinkTo(writer,
        new DataflowLinkOptions() { PropagateCompletion = true });
    PropagateFailure(writer, deserializer); // Link backwards

    foreach (var filePath in filePaths)
    {
        var accepted = await reader.SendAsync(filePath).ConfigureAwait(false);
        if (!accepted) break; // This will happen in case that the block has failed
    }
    reader.Complete(); // This will be ignored if the block has already failed

    await writer.Completion; // This will propagate the first exception that occurred
}

public static async void PropagateFailure(IDataflowBlock block1,
    IDataflowBlock block2)
{
    try { await block1.Completion.ConfigureAwait(false); }
    catch (Exception ex) { block2.Fault(ex); }
}
于 2020-06-07T16:32:15.610 回答