25

IntroToRx一书中,作者建议为 I/O 编写“智能”重试,在一段时间后重试 I/O 请求,如网络请求。

这是确切的段落:

添加到您自己的库的一个有用的扩展方法可能是“Back Off and Retry”方法。我合作过的团队发现这样的功能在执行 I/O 时很有用,尤其是网络请求。这个概念是尝试,失败时等待给定的时间段,然后再试一次。您的此方法版本可能会考虑您要重试的异常类型,以及重试的最大次数。您甚至可能希望延长等待时间,以减少每次后续重试时的积极性。

不幸的是,我不知道如何编写这个方法。:(

4

6 回答 6

40

这种回退重试实现的关键是延迟的 observables。在有人订阅它之前,延迟的 observable 不会执行它的工厂。它将为每个订阅调用工厂,使其成为我们重试场景的理想选择。

假设我们有一个触发网络请求的方法。

public IObservable<WebResponse> SomeApiMethod() { ... }

出于这个小片段的目的,让我们将 deferred 定义为source

var source = Observable.Defer(() => SomeApiMethod());

每当有人订阅源时,它都会调用 SomeApiMethod 并启动一个新的 Web 请求。失败时重试的天真方法是使用内置的 Retry 运算符。

source.Retry(4)

不过,这对 API 来说不是很好,也不是你想要的。我们需要在每次尝试之间延迟发起请求。一种方法是延迟订阅

Observable.Defer(() => source.DelaySubscription(TimeSpan.FromSeconds(1))).Retry(4)

这并不理想,因为即使在第一次请求时它也会增加延迟,让我们解决这个问题。

int attempt = 0;
Observable.Defer(() => { 
   return ((++attempt == 1)  ? source : source.DelaySubscription(TimeSpan.FromSeconds(1)))
})
.Retry(4)
.Select(response => ...)

只是暂停一秒钟并不是一个很好的重试方法,所以让我们将该常量更改为一个接收重试计数并返回适当延迟的函数。指数回退很容易实现。

Func<int, TimeSpan> strategy = n => TimeSpan.FromSeconds(Math.Pow(n, 2));

((++attempt == 1)  ? source : source.DelaySubscription(strategy(attempt - 1)))

我们现在差不多完成了,我们只需要添加一种方法来指定我们应该重试哪些异常。让我们添加一个给定异常返回的函数,无论重试是否有意义,我们将其称为 retryOnError。

现在我们需要编写一些看起来很吓人的代码,但请耐心等待。

Observable.Defer(() => {
    return ((++attempt == 1)  ? source : source.DelaySubscription(strategy(attempt - 1)))
        .Select(item => new Tuple<bool, WebResponse, Exception>(true, item, null))
        .Catch<Tuple<bool, WebResponse, Exception>, Exception>(e => retryOnError(e)
            ? Observable.Throw<Tuple<bool, WebResponse, Exception>>(e)
            : Observable.Return(new Tuple<bool, WebResponse, Exception>(false, null, e)));
})
.Retry(retryCount)
.SelectMany(t => t.Item1
    ? Observable.Return(t.Item2)
    : Observable.Throw<T>(t.Item3))

所有这些尖括号都是为了编组一个我们不应该重试的异常.Retry()。我们将内部 observable 设置为IObservable<Tuple<bool, WebResponse, Exception>>第一个 bool 指示我们是否有响应或异常的地方。如果 retryOnError 指示我们应该重试特定异常,则内部 observable 将抛出并且将被重试拾取。SelectMany 只是解开我们的 Tuple 并使结果 observableIObservable<WebRequest>再次出现。

请参阅我的要点以及最终版本的完整源代码和测试。有了这个运算符,我们可以非常简洁地编写重试代码

Observable.Defer(() => SomApiMethod())
  .RetryWithBackoffStrategy(
     retryCount: 4, 
     retryOnError: e => e is ApiRetryWebException
  )
于 2013-09-25T09:04:06.453 回答
13

也许我过度简化了这种情况,但如果我们看一下 Retry 的实现,它只是一个 Observable.Catch 对一个无限枚举的 observables:

private static IEnumerable<T> RepeatInfinite<T>(T value)
{
    while (true)
        yield return value;
}

public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source)
{
    return Observable.Catch<TSource>(QueryLanguage.RepeatInfinite<IObservable<TSource>(source));
}

因此,如果我们采用这种方法,我们可以在第一次收益之后添加延迟。

private static IEnumerable<IObservable<TSource>> RepeateInfinite<TSource> (IObservable<TSource> source, TimeSpan dueTime)
{
    // Don't delay the first time        
    yield return source;

    while (true)
        yield return source.DelaySubscription(dueTime);
    }

public static IObservable<TSource> RetryAfterDelay<TSource>(this IObservable<TSource> source, TimeSpan dueTime)
{
    return RepeateInfinite(source, dueTime).Catch();
}

使用重试计数捕获特定异常的重载可以更加简洁:

public static IObservable<TSource> RetryAfterDelay<TSource, TException>(this IObservable<TSource> source, TimeSpan dueTime, int count) where TException : Exception
{
    return source.Catch<TSource, TException>(exception =>
    {
        if (count <= 0)
        {
            return Observable.Throw<TSource>(exception);
        }

        return source.DelaySubscription(dueTime).RetryAfterDelay<TSource, TException>(dueTime, --count);
    });
}

请注意,这里的重载是使用递归。乍一看,如果 count 是 Int32.MaxValue 之类的东西,StackOverflowException 似乎是可能的。但是,DelaySubscription 使用调度程序来运行订阅操作,因此堆栈溢出是不可能的(即使用“trampolining”)。我想这通过查看代码并不是很明显。我们可以通过将 DelaySubscription 重载中的调度程序显式设置为 Scheduler.Immediate 并传入 TimeSpan.Zero 和 Int32.MaxValue 来强制堆栈溢出。我们可以传入一个非即时调度器来更明确地表达我们的意图,例如:

return source.DelaySubscription(dueTime, TaskPoolScheduler.Default).RetryAfterDelay<TSource, TException>(dueTime, --count);

更新:添加了重载以接收特定的调度程序。

public static IObservable<TSource> RetryAfterDelay<TSource, TException>(
    this IObservable<TSource> source,
    TimeSpan retryDelay,
    int retryCount,
    IScheduler scheduler) where TException : Exception
{
    return source.Catch<TSource, TException>(
        ex =>
        {
            if (retryCount <= 0)
            {
                return Observable.Throw<TSource>(ex);
            }

            return
                source.DelaySubscription(retryDelay, scheduler)
                    .RetryAfterDelay<TSource, TException>(retryDelay, --retryCount, scheduler);
        });
} 
于 2013-11-22T12:46:58.163 回答
2

这是我正在使用的一个:

public static IObservable<T> DelayedRetry<T>(this IObservable<T> src, TimeSpan delay)
{
    Contract.Requires(src != null);
    Contract.Ensures(Contract.Result<IObservable<T>>() != null);

    if (delay == TimeSpan.Zero) return src.Retry();
    return src.Catch(Observable.Timer(delay).SelectMany(x => src).Retry());
}
于 2013-11-28T17:35:19.990 回答
2

根据马库斯的回答,我写了以下内容:

public static class ObservableExtensions
{
    private static IObservable<T> BackOffAndRetry<T>(
        this IObservable<T> source,
        Func<int, TimeSpan> strategy,
        Func<int, Exception, bool> retryOnError,
        int attempt)
    {
        return Observable
            .Defer(() =>
            {
                var delay = attempt == 0 ? TimeSpan.Zero : strategy(attempt);
                var s = delay == TimeSpan.Zero ? source : source.DelaySubscription(delay);

                return s
                    .Catch<T, Exception>(e =>
                    {
                        if (retryOnError(attempt, e))
                        {
                            return source.BackOffAndRetry(strategy, retryOnError, attempt + 1);
                        }
                        return Observable.Throw<T>(e);
                    });
            });
    }

    public static IObservable<T> BackOffAndRetry<T>(
        this IObservable<T> source,
        Func<int, TimeSpan> strategy,
        Func<int, Exception, bool> retryOnError)
    {
        return source.BackOffAndRetry(strategy, retryOnError, 0);
    }
}

我更喜欢它,因为

  • 它不修改attempts但使用递归。
  • 它不使用retries但将尝试次数传递给retryOnError
于 2017-01-21T22:41:27.347 回答
1

这是我在研究Rxx是如何做到的时提出的另一个稍微不同的实现。所以它在很大程度上是 Rxx 方法的缩减版。

签名与 Markus 的版本略有不同。您指定要重试的异常类型,延迟策略采用异常和重试计数,因此每次连续重试可能会有更长的延迟,等等。

我不能保证它是错误证明或最佳方法,但它似乎有效。

public static IObservable<TSource> RetryWithDelay<TSource, TException>(this IObservable<TSource> source, Func<TException, int, TimeSpan> delayFactory, IScheduler scheduler = null)
where TException : Exception
{
    return Observable.Create<TSource>(observer =>
    {
        scheduler = scheduler ?? Scheduler.CurrentThread;
        var disposable = new SerialDisposable();
        int retryCount = 0;

        var scheduleDisposable = scheduler.Schedule(TimeSpan.Zero,
        self =>
        {
            var subscription = source.Subscribe(
            observer.OnNext,
            ex =>
            {
                var typedException = ex as TException;
                if (typedException != null)
                {
                    var retryDelay = delayFactory(typedException, ++retryCount);
                    self(retryDelay);
                }
                else
                {
                    observer.OnError(ex);
                }
            },
            observer.OnCompleted);

            disposable.Disposable = subscription;
        });

        return new CompositeDisposable(scheduleDisposable, disposable);
    });
}
于 2013-09-26T04:07:16.570 回答
0

这是我想出的那个。

不想将单个重试的项目连接到一个序列中,而是在每次重试时将源序列作为一个整体发出 - 所以运算符返回一个IObservable<IObservable<TSource>>. 如果不希望这样做,可以简单地将其Switch()编回到一个序列中。

(背景:在我的用例中,源是一个热热序列,我GroupByUntil出现了一个关闭组的项目。如果在两次重试之间丢失了该项目,则该组永远不会关闭,从而导致内存泄漏。有一个序列序列允许仅对内部序列进行分组(或异常处理或...)。)

/// <summary>
/// Repeats <paramref name="source"/> in individual windows, with <paramref name="interval"/> time in between.
/// </summary>
public static IObservable<IObservable<TSource>> RetryAfter<TSource>(this IObservable<TSource> source, TimeSpan interval, IScheduler scheduler = null)
{
    if (scheduler == null) scheduler = Scheduler.Default;
    return Observable.Create<IObservable<TSource>>(observer =>
    {
        return scheduler.Schedule(self =>
        {
            observer.OnNext(Observable.Create<TSource>(innerObserver =>
            {
                return source.Subscribe(
                    innerObserver.OnNext,
                    ex => { innerObserver.OnError(ex); scheduler.Schedule(interval, self); },
                    () => { innerObserver.OnCompleted(); scheduler.Schedule(interval, self); });
            }));
        });
    });
}
于 2016-01-07T15:45:06.590 回答