数据处理管道和瞬态故障处理似乎是齐头并进的,所以我很想看看我是否可以得到两个最好的库——分别是TPL Dataflow和Polly——一起玩得很好。
作为起点,我想将故障处理策略应用于ActionBlock
. 理想情况下,我想将它封装在一个带有这样签名的块创建方法中:
ITargetBlock<T> CreatePollyBlock<T>(
Action<T> act, ExecutionDataflowBlockOptions opts, Polly.Policy policy)
简单地policy.Execute
从内部进行操作就很容易了ActionBlock
,但我有这两个要求:
- 在重试的情况下,我不希望重试一个项目优先于其他排队的项目。换句话说,当你失败时,你会排在队伍的后面。
- 更重要的是,如果在重试之前有等待期,我不希望它阻止新项目进入。如果
ExecutionDataflowBlockOptions.MaxDegreeOfParallelism
设置了,我不希望等待重试的项目针对该最大值“计数”。
为了满足这些要求,我认为我需要一个应用ActionBlock
了用户提供的“内部ExecutionDataflowBlockOptions
”块,以及一些将项目发布到内部块并在外部应用任何等待和重试逻辑(或任何策略规定)的“外部”块内部块的上下文。这是我的第一次尝试:
// wrapper that provides a data item with mechanism to await completion
public class WorkItem<T>
{
private readonly TaskCompletionSource<byte> _tcs = new TaskCompletionSource<byte>();
public T Data { get; set; }
public Task Completion => _tcs.Task;
public void SetCompleted() => _tcs.SetResult(0);
public void SetFailed(Exception ex) => _tcs.SetException(ex);
}
ITargetBlock<T> CreatePollyBlock<T>(Action<T> act, Policy policy, ExecutionDataflowBlockOptions opts) {
// create a block that marks WorkItems completed, and allows
// items to fault without faulting the entire block.
var innerBlock = new ActionBlock<WorkItem<T>>(wi => {
try {
act(wi.Data);
wi.SetCompleted();
}
catch (Exception ex) {
wi.SetFailed(ex);
}
}, opts);
return new ActionBlock<T>(async x => {
await policy.ExecuteAsync(async () => {
var workItem = new WorkItem<T> { Data = x };
await innerBlock.SendAsync(workItem);
await workItem.Completion;
});
});
}
为了测试它,我创建了一个带有等待重试策略和一个虚拟方法的块,该方法在前 3 次调用(应用程序范围)时抛出异常。然后我给它一些数据:
"a", "b", "c", "d", "e", "f"
我希望 a、b 和 c 失败并排在最后。但我观察到他们按以下顺序击中内部块的动作:
"a", "a", "a", "a", "b", "c", "d", "e", "f"
从本质上讲,我没有满足自己的要求,而且很容易看出原因:在当前项目的所有重试发生之前,外部块不会让新项目进入。一个简单但看似 hackish 的解决方案是为MaxDegreeOfParallelism
外部块添加一个大值:
return new ActionBlock<T>(async x => {
await policy.ExecuteAsync(async () => {
var workItem = new WorkItem<T> { Data = x };
await innerBlock.SendAsync(workItem);
await workItem.Completion;
});
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });
通过这种更改,我观察到新项目确实会在重试之前进入,但我也引发了一些混乱。进入内部块已经成为随机的,尽管前 3 项总是在最后:
"a", "e", "b", "c", "d", "a", "e", "b"
所以这个稍微好一点。但理想情况下,我希望看到订单保留:
"a", "b", "c", "d", "e", "a", "b", "c"
这就是我卡住的地方,对此进行推理我想知道在这些限制下是否有可能,特别是内部CreatePollyBlock
可以执行策略但不能定义它。例如,如果这些内部可以提供重试 lambda,我认为这会给我更多的选择。但这是政策定义的一部分,根据这个设计,我不能这样做。
提前感谢您的帮助。