5

数据处理管道和瞬态故障处理似乎是齐头并进的,所以我很想看看我是否可以得到两个最好的库——分别是TPL DataflowPolly——一起玩得很好。

作为起点,我想将故障处理策略应用于ActionBlock. 理想情况下,我想将它封装在一个带有这样签名的块创建方法中:

ITargetBlock<T> CreatePollyBlock<T>(
    Action<T> act, ExecutionDataflowBlockOptions opts, Polly.Policy policy)

简单地policy.Execute从内部进行操作就很容易了ActionBlock,但我有这两个要求:

  1. 在重试的情况下,我不希望重试一个项目优先于其他排队的项目。换句话说,当你失败时,你会排在队伍的后面。
  2. 更重要的是,如果在重试之前有等待期,我不希望它阻止新项目进入。如果ExecutionDataflowBlockOptions.MaxDegreeOfParallelism设置了,我不希望等待重试的项目针对该最大值“计数”。

为了满足这些要求,我认为我需要一个应用ActionBlock了用户提供的“内部ExecutionDataflowBlockOptions”块,以及一些将项目发布到内部块并在外部应用任何等待和重试逻辑(或任何策略规定)的“外部”内部块的上下文。这是我的第一次尝试:

// wrapper that provides a data item with mechanism to await completion
public class WorkItem<T>
{
    private readonly TaskCompletionSource<byte> _tcs = new TaskCompletionSource<byte>();

    public T Data { get; set; }
    public Task Completion => _tcs.Task;

    public void SetCompleted() => _tcs.SetResult(0);
    public void SetFailed(Exception ex) => _tcs.SetException(ex);
}

ITargetBlock<T> CreatePollyBlock<T>(Action<T> act, Policy policy, ExecutionDataflowBlockOptions opts) {
    // create a block that marks WorkItems completed, and allows
    // items to fault without faulting the entire block.
    var innerBlock = new ActionBlock<WorkItem<T>>(wi => {
        try {
            act(wi.Data);
            wi.SetCompleted();
        }
        catch (Exception ex) {
            wi.SetFailed(ex);
        }
    }, opts);

    return new ActionBlock<T>(async x => {
        await policy.ExecuteAsync(async () => {
            var workItem = new WorkItem<T> { Data = x };
            await innerBlock.SendAsync(workItem);
            await workItem.Completion;
        });
    });
}

为了测试它,我创建了一个带有等待重试策略和一个虚拟方法的块,该方法在前 3 次调用(应用程序范围)时抛出异常。然后我给它一些数据:

"a", "b", "c", "d", "e", "f"

我希望 a、b 和 c 失败并排在最后。但我观察到他们按以下顺序击中内部块的动作:

"a", "a", "a", "a", "b", "c", "d", "e", "f"

从本质上讲,我没有满足自己的要求,而且很容易看出原因:在当前项目的所有重试发生之前,外部块不会让新项目进入。一个简单但看似 hackish 的解决方案是为MaxDegreeOfParallelism外部块添加一个大值:

return new ActionBlock<T>(async x => {
    await policy.ExecuteAsync(async () => {
        var workItem = new WorkItem<T> { Data = x };
        await innerBlock.SendAsync(workItem);
        await workItem.Completion;
    });
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });

通过这种更改,我观察到新项目确实会在重试之前进入,但我也引发了一些混乱。进入内部块已经成为随机的,尽管前 3 项总是在最后:

"a", "e", "b", "c", "d", "a", "e", "b"

所以这个稍微好一点。但理想情况下,我希望看到订单保留:

"a", "b", "c", "d", "e", "a", "b", "c"

这就是我卡住的地方,对此进行推理我想知道在这些限制下是否有可能,特别是内部CreatePollyBlock可以执行策略但不能定义它。例如,如果这些内部可以提供重试 lambda,我认为这会给我更多的选择。但这是政策定义的一部分,根据这个设计,我不能这样做。

提前感谢您的帮助。

4

0 回答 0