30

当两个变换块都完成时,如何重写代码完成的代码?我认为完成意味着它被标记为完成并且“out queue”是空的?

public Test()
    {
        broadCastBlock = new BroadcastBlock<int>(i =>
            {
                return i;
            });

        transformBlock1 = new TransformBlock<int, string>(i =>
            {
                Console.WriteLine("1 input count: " + transformBlock1.InputCount);
                Thread.Sleep(50);
                return ("1_" + i);
            });

        transformBlock2 = new TransformBlock<int, string>(i =>
            {
                Console.WriteLine("2 input count: " + transformBlock1.InputCount);
                Thread.Sleep(20);
                return ("2_" + i);
            });

        processorBlock = new ActionBlock<string>(i =>
            {
                Console.WriteLine(i);
            });

        //Linking
        broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
        broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
        transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
    }

    public void Start()
    {
        const int numElements = 100;

        for (int i = 1; i <= numElements; i++)
        {
            broadCastBlock.SendAsync(i);
        }

        //mark completion
        broadCastBlock.Complete();

        processorBlock.Completion.Wait();

        Console.WriteLine("Finished");
        Console.ReadLine();
    }
}

我编辑了代码,为每个转换块添加了输入缓冲区计数。很明显,所有 100 个项目都流式传输到每个变换块。但是一旦一个变换块完成,处理器块就不再接受任何项目,而是不完整的变换块的输入缓冲区只是刷新输入缓冲区。

4

5 回答 5

36

这个问题正是 casperOne 在他的回答中所说的。一旦第一个转换块完成,处理器块进入“完成模式”:它将处理其输入队列中的剩余项目,但不会接受任何新项目。

有一个比将处理器块一分为二更简单的解决方法:不要设置PropagateCompletion,而是在两个转换块都完成时手动设置处理器块的完成:

Task.WhenAll(transformBlock1.Completion, transformBlock2.Completion)
    .ContinueWith(_ => processorBlock.Complete());
于 2012-11-23T21:52:49.373 回答
30

这里的问题是,每次调用链接块的方法以及转换块中不同的等待时间时,您都在设置PropagateCompletion属性。LinkTo

从接口上的Complete方法的文档(强调我的):IDataflowBlock

向 IDataflowBlock 发出信号,表明它不应接受或产生任何更多消息,也不应消耗任何更多延迟的消息

因为您在每个TransformBlock<TInput, TOutput>实例中错开等待时间,所以transformBlock2(等待 20 毫秒)在transformBlock1(等待 50 毫秒)之前完成。 transformBlock2首先完成,然后发送信号processorBlock,然后说“我不接受其他任何东西”(并且transformBlock1还没有产生所有的消息)。

注意transformBlock1before的处理transformBlock1不是绝对保证的;线程池(假设您使用默认调度程序)以不同的顺序处理任务是可行的(但很可能不会,因为一旦完成 20 毫秒的项目,它将从队列中窃取工作)。

您的管道如下所示:

           broadcastBlock
          /              \
 transformBlock1   transformBlock2
          \              /
           processorBlock

为了解决这个问题,您需要一个如下所示的管道:

           broadcastBlock
          /              \
 transformBlock1   transformBlock2
          |              |
 processorBlock1   processorBlock2

只需创建两个单独的ActionBlock<TInput>实例即可完成,如下所示:

// The action, can be a method, makes it easier to share.
Action<string> a = i => Console.WriteLine(i);

// Create the processor blocks.
processorBlock1 = new ActionBlock<string>(a);
processorBlock2 = new ActionBlock<string>(a);


// Linking
broadCastBlock.LinkTo(transformBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true });
broadCastBlock.LinkTo(transformBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true });
transformBlock1.LinkTo(processorBlock1, 
    new DataflowLinkOptions { PropagateCompletion = true });
transformBlock2.LinkTo(processorBlock2, 
    new DataflowLinkOptions { PropagateCompletion = true });

然后,您需要等待两个处理器块,而不仅仅是一个:

Task.WhenAll(processorBlock1.Completion, processorBlock2.Completion).Wait();

这里有一个非常重要的注意事项;创建 时ActionBlock<TInput>,默认设置是将传递给它的实例上的MaxDegreeOfParallelism属性设置为 1。ExecutionDataflowBlockOptions

这意味着您传递给Action<T>委托ActionBlock<TInput>的调用是线程安全的,一次只会执行一个。

因为您现在有两个 ActionBlock<TInput>实例指向同一个Action<T>委托,所以不能保证线程安全。

如果您的方法是线程安全的,那么您不必做任何事情(这将允许您将MaxDegreeOfParallelism属性设置为DataflowBlockOptions.Unbounded,因为没有理由阻止)。

如果它不是线程安全的,并且您需要保证它,则需要求助于传统的同步原语,例如lockstatement

在这种情况下,您可以这样做(尽管显然不需要,因为上的WriteLine方法是线程安全的):Console

// The lock.
var l = new object();

// The action, can be a method, makes it easier to share.
Action<string> a = i => {
    // Ensure one call at a time.
    lock (l) Console.WriteLine(i);
};

// And so on...
于 2012-11-23T17:07:00.107 回答
9

svick 回答的补充:为了与您使用 PropagateCompletion 选项获得的行为一致,您还需要转发异常以防前一个块出现故障。像下面这样的扩展方法也可以解决这个问题:

public static void CompleteWhenAll(this IDataflowBlock target, params IDataflowBlock[] sources) {
    if (target == null) return;
    if (sources.Length == 0) { target.Complete(); return; }
    Task.Factory.ContinueWhenAll(
        sources.Select(b => b.Completion).ToArray(),
        tasks => {
            var exceptions = (from t in tasks where t.IsFaulted select t.Exception).ToList();
            if (exceptions.Count != 0) {
                target.Fault(new AggregateException(exceptions));
            } else {
                target.Complete();
            }
        }
    );
}
于 2013-04-09T09:49:52.160 回答
1

其他答案很清楚为什么 PropagateCompletion=true 当一个块有两个以上的来源时会搞砸。

要为该问题提供一个简单的解决方案,您可能需要查看一个开源库DataflowEx,它通过内置更智能的完成规则来解决此类问题。(它在内部使用 TPL 数据流链接,但支持复杂的完成传播。实现看起来类似于 WhenAll,但也处理动态链接添加。请检查Dataflow.RegisterDependency()TaskEx.AwaitableWhenAll()了解详细信息。)

我稍微更改了您的代码,以使用 DataflowEx 使一切正常:

public CompletionDemo1()
{
    broadCaster = new BroadcastBlock<int>(
        i =>
            {
                return i;
            }).ToDataflow();

    transformBlock1 = new TransformBlock<int, string>(
        i =>
            {
                Console.WriteLine("1 input count: " + transformBlock1.InputCount);
                Thread.Sleep(50);
                return ("1_" + i);
            });

    transformBlock2 = new TransformBlock<int, string>(
        i =>
            {
                Console.WriteLine("2 input count: " + transformBlock2.InputCount);
                Thread.Sleep(20);
                return ("2_" + i);
            });

    processor = new ActionBlock<string>(
        i =>
            {
                Console.WriteLine(i);
            }).ToDataflow();

    /** rather than TPL linking
      broadCastBlock.LinkTo(transformBlock1, new DataflowLinkOptions { PropagateCompletion = true });
      broadCastBlock.LinkTo(transformBlock2, new DataflowLinkOptions { PropagateCompletion = true });
      transformBlock1.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
      transformBlock2.LinkTo(processorBlock, new DataflowLinkOptions { PropagateCompletion = true });
     **/

    //Use DataflowEx linking
    var transform1 = transformBlock1.ToDataflow();
    var transform2 = transformBlock2.ToDataflow();

    broadCaster.LinkTo(transform1);
    broadCaster.LinkTo(transform2);
    transform1.LinkTo(processor);
    transform2.LinkTo(processor);
}

完整代码在这里

免责声明:我是 DataflowEx 的作者,它是在 MIT 许可下发布的。

于 2014-12-19T09:01:19.657 回答
1

这是一个功能上与 pkt 的方法等效的CompleteWhenAll方法,但代码略少:

public static async void PropagateCompletion(IDataflowBlock[] sources,
    IDataflowBlock target)
{
    // Arguments validation omitted
    Task allSourcesCompletion = Task.WhenAll(sources.Select(s => s.Completion));

    try { await allSourcesCompletion.ConfigureAwait(false); } catch { }

    var exception = allSourcesCompletion.IsFaulted ?
        allSourcesCompletion.Exception : null;

    if (exception != null) target.Fault(exception); else target.Complete();
}

使用示例:

PropagateCompletion(new[] { transformBlock1, transformBlock2 }, processorBlock);

PropagateCompletion方法是我在此处发布的具有相同名称的更通用方法的变体。

于 2020-05-06T15:30:12.790 回答