TransformBlock
如果当块完成其队列中所有消息的处理时有一个ProcessingCompleted
事件会触发,但没有这样的事件,那就太好了。以下是纠正这一遗漏的尝试。该CreateTransformBlockEx
方法接受一个Action<Exception>
处理程序,该处理程序在此“事件”发生时被调用。
目的是始终在块最终完成之前调用处理程序。不幸的是,在提供CancellationToken
被取消的情况下,完成(取消)首先发生,并且处理程序在几毫秒后被调用。要解决这种不一致,需要一些棘手的解决方法,并且可能会有其他不需要的副作用,所以我将其保留原样。
public static IPropagatorBlock<TInput, TOutput>
CreateTransformBlockEx<TInput, TOutput>(Func<TInput, Task<TOutput>> transform,
Action<Exception> onProcessingCompleted,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
if (onProcessingCompleted == null)
throw new ArgumentNullException(nameof(onProcessingCompleted));
dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();
var transformBlock = new TransformBlock<TInput, TOutput>(transform,
dataflowBlockOptions);
var bufferBlock = new BufferBlock<TOutput>(dataflowBlockOptions);
transformBlock.LinkTo(bufferBlock);
PropagateCompletion(transformBlock, bufferBlock, onProcessingCompleted);
return DataflowBlock.Encapsulate(transformBlock, bufferBlock);
async void PropagateCompletion(IDataflowBlock block1, IDataflowBlock block2,
Action<Exception> completionHandler)
{
try
{
await block1.Completion.ConfigureAwait(false);
}
catch { }
var exception =
block1.Completion.IsFaulted ? block1.Completion.Exception : null;
try
{
// Invoke the handler before completing the second block
completionHandler(exception);
}
finally
{
if (exception != null) block2.Fault(exception); else block2.Complete();
}
}
}
// Overload with synchronous lambda
public static IPropagatorBlock<TInput, TOutput>
CreateTransformBlockEx<TInput, TOutput>(Func<TInput, TOutput> transform,
Action<Exception> onProcessingCompleted,
ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
return CreateTransformBlockEx<TInput, TOutput>(
x => Task.FromResult(transform(x)), onProcessingCompleted,
dataflowBlockOptions);
}
当使用选项调用时,本地函数的代码PropagateCompletion
模仿内置方法的源代码。LinkTo
PropagateCompletion = true
使用示例:
var httpClient = new HttpClient();
var downloader = CreateTransformBlockEx<string, string>(async url =>
{
return await httpClient.GetStringAsync(url);
}, onProcessingCompleted: ex =>
{
Console.WriteLine($"Download completed {(ex == null ? "OK" : "Error")}");
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = 10
});