5

我寻找可以通过 n-TransformBlocks 链接到 JoinBlock 的替代方法,并将所有 TransformBlock 源块的消息连接/合并在一起,以便将此类集合传递到另一个数据流块。

JoinBlock 可以很好地完成这项工作,但仅限于连接最多 3 个源块。它还存在很多效率低下的问题(连接 2 个源块的偶数值类型(整数)非常慢)。有没有办法让从 TransformBlocks 返回的 Tasks 并等到所有 TransformBlocks 都有完成的任务才能在接受之前传递Task<item>

有什么替代的想法吗?在传递加入的项目集合之前,我可能有 1-20 个这样的转换块,我需要将哪些项目连接在一起。每个变换块保证为每个“变换”的输入项准确返回一个输出项。

编辑:要求澄清:

根据我之前的一个问题,我将 JoinBlocks 设置如下:

public Test()
{
    broadCastBlock = new BroadcastBlock<int>(i =>
        {
            return i;
        });

    transformBlock1 = new TransformBlock<int, int>(i =>
        {
            return i;
        });

    transformBlock2 = new TransformBlock<int, int>(i =>
        {
            return i;
        });

    joinBlock = new JoinBlock<int, int>();

    processorBlock = new ActionBlock<Tuple<int, int>>(tuple =>
        {
            //Console.WriteLine("tfb1: " + tuple.Item1 + "tfb2: " + tuple.Item2);
        });

    //Linking
    broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
    broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
    transformBlock1.LinkTo(joinBlock.Target1);
    transformBlock2.LinkTo(joinBlock.Target2);
    joinBlock.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
}

public void Start()
{
    Stopwatch watch = new Stopwatch();
    watch.Start();

    const int numElements = 1000000;

    for (int i = 1; i <= numElements; i++)
    {
        broadCastBlock.Post(i);
    }

    ////mark completion
    broadCastBlock.Complete();
    Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion).ContinueWith(_ => joinBlock.Complete());


    processorBlock.Completion.Wait();

    watch.Stop();

    Console.WriteLine("Time it took: " + watch.ElapsedMilliseconds + " - items processed per second: " + numElements / watch.ElapsedMilliseconds * 1000);
    Console.ReadLine();
}
4

2 回答 2

6

一种方法是使用BatchBlockwith Greedyset to false。在这个配置中,块不会做任何事情,直到有n来自n不同块的项目等待它被消耗(在哪里n是你在创建时设置的数字BatchBlock)。发生这种情况时,它会立即消耗所有n项目并生成一个包含所有项目的数组。

此解决方案的一个警告是生成的数组未排序:您不会知道哪个项目来自哪个来源。而且我不知道它的性能与它相比如何JoinBlock,您必须自己测试。(虽然我会理解如果使用BatchBlock这种方式会更慢,因为非贪婪消费所需的开销。)

于 2012-12-02T10:16:16.347 回答
0

如果您想为每个项目执行多个并行操作,恕我直言,在单个块内执行这些操作更有意义,而不是将它们拆分为多个块,然后尝试再次将独立结果连接到单个对象中。所以我的建议是做这样的事情:

var block = new TransformBlock<MyClass, MyClass>(async item =>
{
    Task<SomeType1> task1 = Task.Run(() => CalculateProperty1(item.Id));
    Task<SomeType2> task2 = Task.Run(() => CalculateProperty2(item.Id));
    await Task.WhenAll(task1, task2).ConfigureAwait(false);
    item.Property1 = task1.Result;
    item.Property2 = task2.Result;
    return item;
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 2
});

在上面的示例中,类型MyClass的项目通过TransformBlock. 每个项目的属性Property1Property2使用单独Task的每个属性并行计算。然后等待两个任务,当两个任务都完成时,结果将分配给项目的属性。最后返回处理后的项目。

对于这种方法,您唯一需要注意的是并行度将是内部并行操作和MaxDegreeOfParallelism块选项的乘积。因此,在上面的示例中,并行度将为 2 x 2 = 4。准确地说,这将是最大并行度,因为两个内部计算中的一个可能会比另一个慢。因此,在任何给定时刻,实际并行度可能在 2 到 4 之间。

于 2020-06-11T10:59:11.887 回答