这种回退重试实现的关键是延迟的 observables。在有人订阅它之前,延迟的 observable 不会执行它的工厂。它将为每个订阅调用工厂,使其成为我们重试场景的理想选择。
假设我们有一个触发网络请求的方法。
public IObservable<WebResponse> SomeApiMethod() { ... }
出于这个小片段的目的,让我们将 deferred 定义为source
var source = Observable.Defer(() => SomeApiMethod());
每当有人订阅源时,它都会调用 SomeApiMethod 并启动一个新的 Web 请求。失败时重试的天真方法是使用内置的 Retry 运算符。
source.Retry(4)
不过,这对 API 来说不是很好,也不是你想要的。我们需要在每次尝试之间延迟发起请求。一种方法是延迟订阅。
Observable.Defer(() => source.DelaySubscription(TimeSpan.FromSeconds(1))).Retry(4)
这并不理想,因为即使在第一次请求时它也会增加延迟,让我们解决这个问题。
int attempt = 0;
Observable.Defer(() => {
return ((++attempt == 1) ? source : source.DelaySubscription(TimeSpan.FromSeconds(1)))
})
.Retry(4)
.Select(response => ...)
只是暂停一秒钟并不是一个很好的重试方法,所以让我们将该常量更改为一个接收重试计数并返回适当延迟的函数。指数回退很容易实现。
Func<int, TimeSpan> strategy = n => TimeSpan.FromSeconds(Math.Pow(n, 2));
((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1)))
我们现在差不多完成了,我们只需要添加一种方法来指定我们应该重试哪些异常。让我们添加一个给定异常返回的函数,无论重试是否有意义,我们将其称为 retryOnError。
现在我们需要编写一些看起来很吓人的代码,但请耐心等待。
Observable.Defer(() => {
return ((++attempt == 1) ? source : source.DelaySubscription(strategy(attempt - 1)))
.Select(item => new Tuple<bool, WebResponse, Exception>(true, item, null))
.Catch<Tuple<bool, WebResponse, Exception>, Exception>(e => retryOnError(e)
? Observable.Throw<Tuple<bool, WebResponse, Exception>>(e)
: Observable.Return(new Tuple<bool, WebResponse, Exception>(false, null, e)));
})
.Retry(retryCount)
.SelectMany(t => t.Item1
? Observable.Return(t.Item2)
: Observable.Throw<T>(t.Item3))
所有这些尖括号都是为了编组一个我们不应该重试的异常.Retry()
。我们将内部 observable 设置为IObservable<Tuple<bool, WebResponse, Exception>>
第一个 bool 指示我们是否有响应或异常的地方。如果 retryOnError 指示我们应该重试特定异常,则内部 observable 将抛出并且将被重试拾取。SelectMany 只是解开我们的 Tuple 并使结果 observableIObservable<WebRequest>
再次出现。
请参阅我的要点以及最终版本的完整源代码和测试。有了这个运算符,我们可以非常简洁地编写重试代码
Observable.Defer(() => SomApiMethod())
.RetryWithBackoffStrategy(
retryCount: 4,
retryOnError: e => e is ApiRetryWebException
)