5

鉴于以下情况:

BufferBlock<int> sourceBlock = new BufferBlock<int>();
TransformBlock<int, int> targetBlock = new TransformBlock<int, int>(element =>
{
    return element * 2;
});

sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions { PropagateCompletion = true });

//feed some elements into the buffer block
for(int i = 1; i <= 1000000; i++)
{
    sourceBlock.SendAsync(i);
}

sourceBlock.Complete();

targetBlock.Completion.ContinueWith(_ =>
{
    //notify completion of the target block
});

targetBlock似乎永远不会完成,我认为原因是其中的所有项目都TransformBlock targetBlock在输出队列中等待,因为我没有将其链接targetBlock到任何其他 Dataflow 块。但是,我真正想要实现的是当 (A)targetBlock被通知完成并且 (B) 输入队列为空时的通知。我不想关心项目是否仍然位于TransformBlock. 我该怎么办?是获取我想要查询sourceBlockAND 的完成状态以确保 的InputCount为零targetBlock的唯一方法吗?我不确定这是否非常稳定(sourceBlock如果最后一项sourceBlock已传递给targetBlock?)。有没有更优雅、更有效的方式来实现相同的目标?

编辑:我刚刚注意到,即使是“肮脏”的方式来检查sourceBlock与为零InputCount的完成情况targetBlock也并非易事。那个街区会坐在哪里?它不能在其中,targetBlock因为一旦满足上述两个条件,显然就targetBlock不再处理任何消息。还检查完成状态sourceBlock引入了很多低效率。

4

3 回答 3

1

我相信你不能直接这样做。您可以private使用反射从某些字段中获取此信息,但我不建议您这样做。

但是您可以通过创建自定义块来做到这一点。在这种情况下Complete()很简单:只需创建一个将每个方法转发到原始块的块。除了Complete(),它也会在哪里记录它。

在确定所有项目的处理何时完成的情况下,您可以将您的块链接到一个中间BufferBlock. 这样,输出队列将被快速清空,因此检查Completed内部块将为您提供相当准确的处理何时完成的测量。这会影响您的测量,但希望不会显着。

另一种选择是在块委托的末尾添加一些日志记录。这样,您可以看到最后一个项目的处理完成时间。

于 2012-11-28T17:13:25.840 回答
1

TransformBlock如果当块完成其队列中所有消息的处理时有一个ProcessingCompleted事件会触发,但没有这样的事件,那就太好了。以下是纠正这一遗漏的尝试。该CreateTransformBlockEx方法接受一个Action<Exception>处理程序,该处理程序在此“事件”发生时被调用。

目的是始终在块最终完成之前调用处理程序。不幸的是,在提供CancellationToken被取消的情况下,完成(取消)首先发生,并且处理程序在几毫秒后被调用。要解决这种不一致,需要一些棘手的解决方法,并且可能会有其他不需要的副作用,所以我将其保留原样。

public static IPropagatorBlock<TInput, TOutput>
    CreateTransformBlockEx<TInput, TOutput>(Func<TInput, Task<TOutput>> transform,
    Action<Exception> onProcessingCompleted,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (onProcessingCompleted == null)
        throw new ArgumentNullException(nameof(onProcessingCompleted));
    dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();

    var transformBlock = new TransformBlock<TInput, TOutput>(transform,
        dataflowBlockOptions);
    var bufferBlock = new BufferBlock<TOutput>(dataflowBlockOptions);

    transformBlock.LinkTo(bufferBlock);
    PropagateCompletion(transformBlock, bufferBlock, onProcessingCompleted);
    return DataflowBlock.Encapsulate(transformBlock, bufferBlock);

    async void PropagateCompletion(IDataflowBlock block1, IDataflowBlock block2,
        Action<Exception> completionHandler)
    {
        try
        {
            await block1.Completion.ConfigureAwait(false);
        }
        catch { }
        var exception = 
            block1.Completion.IsFaulted ? block1.Completion.Exception : null;
        try
        {
            // Invoke the handler before completing the second block
            completionHandler(exception);
        }
        finally
        {
            if (exception != null) block2.Fault(exception); else block2.Complete();
        }
    }
}

// Overload with synchronous lambda
public static IPropagatorBlock<TInput, TOutput>
    CreateTransformBlockEx<TInput, TOutput>(Func<TInput, TOutput> transform,
    Action<Exception> onProcessingCompleted,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    return CreateTransformBlockEx<TInput, TOutput>(
        x => Task.FromResult(transform(x)), onProcessingCompleted,
        dataflowBlockOptions);
}

当使用选项调用时,本地函数的代码PropagateCompletion模仿内置方法的源代码。LinkToPropagateCompletion = true

使用示例:

var httpClient = new HttpClient();
var downloader = CreateTransformBlockEx<string, string>(async url =>
{
    return await httpClient.GetStringAsync(url);
}, onProcessingCompleted: ex =>
{
    Console.WriteLine($"Download completed {(ex == null ? "OK" : "Error")}");
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 10
});
于 2020-06-08T11:01:27.680 回答
0

首先,将 IPropagator 块用作叶终端是不正确的。但是您的要求仍然可以通过异步检查 TargetBlock 的输出缓冲区中的输出消息然后消费然后清空缓冲区来满足。

    `  BufferBlock<int> sourceBlock = new BufferBlock<int>();
       TransformBlock<int, int> targetBlock = new TransformBlock<int, int> 
       (element =>
       {

        return element * 2;
        });
        sourceBlock.LinkTo(targetBlock, new DataflowLinkOptions { 
        PropagateCompletion = true });

        //feed some elements into the buffer block
        for (int i = 1; i <= 100; i++)
        {
             sourceBlock.SendAsync(i);
        }

        sourceBlock.Complete();

        bool isOutputAvailable = await targetBlock.OutputAvailableAsync();
        while(isOutputAvailable)
        {
            int value = await targetBlock.ReceiveAsync();

            isOutputAvailable = await targetBlock.OutputAvailableAsync();
        }


        await targetBlock.Completion.ContinueWith(_ =>
        {
            Console.WriteLine("Target Block Completed");//notify completion of the target block
        });

`

于 2020-01-01T08:23:04.183 回答