- 调用者希望处理所有项目,即使其中一些项目反复失败。
- 调用者有兴趣了解所有发生的异常,即使是最终成功的项目(不适用于
- 调用者可能希望设置总重试次数的上限,在此之后块应转换为故障状态。
- 调用者希望能够在与重试功能相关的选项之上 设置普通块的所有可用选项,包括
, 。CancellationToken
下面的实现使用 aSemaphoreSlim
public class RetryExecutionDataflowBlockOptions : ExecutionDataflowBlockOptions
/// <summary>The limit after which an item is returned as failed.</summary>
public int MaxAttemptsPerItem { get; set; } = 1;
/// <summary>The delay duration before retrying an item.</summary>
public TimeSpan RetryDelay { get; set; } = TimeSpan.Zero;
/// <summary>The limit after which the block transitions to a faulted
/// state (unlimited is the default).</summary>
public int MaxRetriesTotal { get; set; } = -1;
public readonly struct RetryResult<TInput, TOutput>
public readonly TInput Input { get; }
public readonly TOutput Output { get; }
public readonly bool Success { get; }
public readonly Exception[] Exceptions { get; }
public bool Failed => !Success;
public Exception FirstException => Exceptions != null ? Exceptions[0] : null;
public int Attempts =>
Exceptions != null ? Exceptions.Length + (Success ? 1 : 0) : 1;
public RetryResult(TInput input, TOutput output, bool success,
Exception[] exceptions)
Input = input;
Output = output;
Success = success;
Exceptions = exceptions;
public class RetryLimitException : Exception
public RetryLimitException(string message, Exception innerException)
: base(message, innerException) { }
public static IPropagatorBlock<TInput, RetryResult<TInput, TOutput>>
CreateRetryTransformBlock<TInput, TOutput>(
Func<TInput, Task<TOutput>> transform,
RetryExecutionDataflowBlockOptions dataflowBlockOptions)
if (transform == null) throw new ArgumentNullException(nameof(transform));
if (dataflowBlockOptions == null)
throw new ArgumentNullException(nameof(dataflowBlockOptions));
int maxAttemptsPerItem = dataflowBlockOptions.MaxAttemptsPerItem;
int maxRetriesTotal = dataflowBlockOptions.MaxRetriesTotal;
TimeSpan retryDelay = dataflowBlockOptions.RetryDelay;
if (maxAttemptsPerItem < 1) throw new ArgumentOutOfRangeException(
if (maxRetriesTotal < -1) throw new ArgumentOutOfRangeException(
if (retryDelay < TimeSpan.Zero) throw new ArgumentOutOfRangeException(
var cancellationToken = dataflowBlockOptions.CancellationToken;
var exceptionsCount = 0;
var semaphore = new SemaphoreSlim(
async Task<(TOutput, Exception)> ProcessOnceAsync(TInput item)
await semaphore.WaitAsync(); // Preserve the SynchronizationContext
var result = await transform(item).ConfigureAwait(false);
return (result, null);
catch (Exception ex)
if (maxRetriesTotal != -1)
if (Interlocked.Increment(ref exceptionsCount) > maxRetriesTotal)
throw new RetryLimitException($"The max retry limit " +
$"({maxRetriesTotal}) has been reached.", ex);
return (default, ex);
async Task<Task<RetryResult<TInput, TOutput>>> ProcessWithRetryAsync(
TInput item)
// Creates a two-stages operation. Preserves the context on every await.
var (result, firstException) = await ProcessOnceAsync(item);
if (firstException == null) return Task.FromResult(
new RetryResult<TInput, TOutput>(item, result, true, null));
return RetryStageAsync();
async Task<RetryResult<TInput, TOutput>> RetryStageAsync()
var exceptions = new List<Exception>();
for (int i = 2; i <= maxAttemptsPerItem; i++)
await Task.Delay(retryDelay, cancellationToken);
var (result, exception) = await ProcessOnceAsync(item);
if (exception != null)
return new RetryResult<TInput, TOutput>(item, result,
true, exceptions.ToArray());
return new RetryResult<TInput, TOutput>(item, default, false,
// The input block awaits the first stage of each operation
var input = new TransformBlock<TInput, Task<RetryResult<TInput, TOutput>>>(
item => ProcessWithRetryAsync(item), dataflowBlockOptions);
// The output block awaits the second (and final) stage of each operation
var output = new TransformBlock<Task<RetryResult<TInput, TOutput>>,
RetryResult<TInput, TOutput>>(t => t, dataflowBlockOptions);
input.LinkTo(output, new DataflowLinkOptions { PropagateCompletion = true });
// In case of failure ensure that the input block is faulted too,
// so that its input/output queues are emptied, and any pending
// SendAsync operations are aborted
PropagateFailure(output, input);
return DataflowBlock.Encapsulate(input, output);
async void PropagateFailure(IDataflowBlock block1, IDataflowBlock block2)
try { await block1.Completion.ConfigureAwait(false); }
catch (Exception ex) { block2.Fault(ex); }
public static ITargetBlock<TInput> CreateRetryActionBlock<TInput>(
Func<TInput, Task> action,
RetryExecutionDataflowBlockOptions dataflowBlockOptions)
if (action == null) throw new ArgumentNullException(nameof(action));
var block = CreateRetryTransformBlock<TInput, object>(async input =>
await action(input).ConfigureAwait(false); return null;
}, dataflowBlockOptions);
var nullTarget = DataflowBlock.NullTarget<RetryResult<TInput, object>>();
return block;