97

I am using rxjava in my Android app to handle network requests asynchronously. Now I would like to retry a failed network request only after a certain time has passed.

Is there any way to use retry() on an Observable but to retry only after a certain delay?

Is there a way to let the Observable know that is is currently being retried (as opposed to tried for the first time)?

I had a look at debounce()/throttleWithTimeout() but they seem to be doing something different.

Edit:

I think I found one way to do it, but I'd be interested in either confirmation that this is the correct way to do it or for other, better ways.

What I am doing is this: In the call() method of my Observable.OnSubscribe, before I call the Subscribers onError() method, I simply let the Thread sleep for the desired amount of time. So, to retry every 1000 milliseconds, I do something like this:

@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
    try {
        Log.d(TAG, "trying to load all products with pid: " + pid);
        subscriber.onNext(productClient.getProductNodesForParentId(pid));
        subscriber.onCompleted();
    } catch (Exception e) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e.printStackTrace();
        }
        subscriber.onError(e);
    }
}

Since this method is running on an IO thread anyway it does not block the UI. The only problem I can see is that even the first error is reported with delay so the delay is there even if there's no retry(). I'd like it better if the delay wasn't applied after an error but instead before a retry (but not before the first try, obviously).

4

17 回答 17

179

您可以使用retryWhen()运算符将​​重试逻辑添加到任何 Observable。

以下类包含重试逻辑:

RxJava 2.x

public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> apply(final Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Function<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> apply(final Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

RxJava 1.x

public class RetryWithDelay implements
        Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

用法:

// Add retry logic to existing observable.
// Retry max of 3 times with a delay of 2 seconds.
observable
    .retryWhen(new RetryWithDelay(3, 2000));
于 2014-08-13T17:46:12.670 回答
23

保罗回答的启发,如果您不关心Abhijit SarkarretryWhen提出的问题,使用 rxJava2 无条件延迟重新订阅的最简单方法是:

source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))

您可能希望查看更多关于retryWhen 和 repeatWhen的示例和解释。

于 2017-03-14T12:46:55.593 回答
16

此示例适用于 jxjava 2.2.2:

立即重试:

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retry(5)
   .doOnSuccess(status -> log.info("Yay! {}", status);

延迟重试:

Single.just(somePaylodData)
   .map(data -> someConnection.send(data))
   .retryWhen((Flowable<Throwable> f) -> f.take(5).delay(300, TimeUnit.MILLISECONDS))
   .doOnSuccess(status -> log.info("Yay! {}", status)
   .doOnError((Throwable error) 
                -> log.error("I tried five times with a 300ms break" 
                             + " delay in between. But it was in vain."));

如果 someConnection.send() 失败,我们的源单会失败。当这种情况发生时,retryWhen 中的可观察到的失败会发出错误。我们将该发射延迟 300 毫秒并将其发送回以发出重试信号。take(5) 保证我们的信号 observable 将在我们收到五个错误后终止。retryWhen 看到终止并且在第五次失败后不重试。

于 2018-10-09T14:04:51.970 回答
9

这是一个基于我看到的 Ben Christensen 的片段、RetryWhen ExampleRetryWhenTestsConditional的解决方案(我必须更改n.getThrowable()n它才能工作)。我使用evant/gradle-retrolambda使 lambda 表示法在 Android 上工作,但您不必使用 lambda(尽管强烈推荐)。对于延迟,我实现了指数回退,但您可以在此处插入您想要的任何回退逻辑。为了完整起见,我添加了subscribeOnandobserveOn运算符。我将ReactiveX/RxAndroid用于AndroidSchedulers.mainThread().

int ATTEMPT_COUNT = 10;

public class Tuple<X, Y> {
    public final X x;
    public final Y y;

    public Tuple(X x, Y y) {
        this.x = x;
        this.y = y;
    }
}


observable
    .subscribeOn(Schedulers.io())
    .retryWhen(
            attempts -> {
                return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i))
                .flatMap(
                        ni -> {
                            if (ni.y > ATTEMPT_COUNT)
                                return Observable.error(ni.x);
                            return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS);
                        });
            })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);
于 2014-11-23T22:09:09.153 回答
8

而不是使用 MyRequestObservable.retry 我使用一个包装函数 retryObservable(MyRequestObservable, retrycount, seconds) 它返回一个新的 Observable 来处理延迟的间接性,所以我可以做

retryObservable(restApi.getObservableStuff(), 3, 30)
    .subscribe(new Action1<BonusIndividualList>(){
        @Override
        public void call(BonusIndividualList arg0) 
        {
            //success!
        }
    }, 
    new Action1<Throwable>(){
        @Override
        public void call(Throwable arg0) { 
           // failed after the 3 retries !
        }}); 


// wrapper code
private static <T> Observable<T> retryObservable(
        final Observable<T> requestObservable, final int nbRetry,
        final long seconds) {

    return Observable.create(new Observable.OnSubscribe<T>() {

        @Override
        public void call(final Subscriber<? super T> subscriber) {
            requestObservable.subscribe(new Action1<T>() {

                @Override
                public void call(T arg0) {
                    subscriber.onNext(arg0);
                    subscriber.onCompleted();
                }
            },

            new Action1<Throwable>() {
                @Override
                public void call(Throwable error) {

                    if (nbRetry > 0) {
                        Observable.just(requestObservable)
                                .delay(seconds, TimeUnit.SECONDS)
                                .observeOn(mainThread())
                                .subscribe(new Action1<Observable<T>>(){
                                    @Override
                                    public void call(Observable<T> observable){
                                        retryObservable(observable,
                                                nbRetry - 1, seconds)
                                                .subscribe(subscriber);
                                    }
                                });
                    } else {
                        // still fail after retries
                        subscriber.onError(error);
                    }

                }
            });

        }

    });

}
于 2014-04-18T16:06:48.130 回答
8

基于kjones的答案是 Kotlin 版本的 RxJava 2.x 重试,延迟作为扩展。替换ObservableFlowable.

fun <T> Observable<T>.retryWithDelay(maxRetries: Int, retryDelayMillis: Int): Observable<T> {
    var retryCount = 0

    return retryWhen { thObservable ->
        thObservable.flatMap { throwable ->
            if (++retryCount < maxRetries) {
                Observable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS)
            } else {
                Observable.error(throwable)
            }
        }
    }
}

然后在 observable 上使用它observable.retryWithDelay(3, 1000)

于 2019-08-30T16:14:46.150 回答
7

retryWhen是一个复杂的,甚至可能是错误的操作符。官方文档和至少一个答案在这里使用range运算符,如果没有重试,它将失败。请参阅我与 ReactiveX 成员 David Karnok的讨论。

我通过更改和添加一个类flatMap来改进 kjones 的答案。不会保留发射顺序,这对于回退延迟很重要。顾名思义,让用户从各种产生重试延迟的模式中进行选择,包括回退。该代码可在我的GitHub 上找到,并包含以下测试用例:concatMapRetryDelayStrategyflatMapconcatMapRetryDelayStrategy

  1. 第一次尝试成功(不重试)
  2. 重试 1 次后失败
  3. 尝试重试 3 次但第 2 次成功,因此不会重试第 3 次
  4. 第三次重试成功

setRandomJokes方法。

于 2016-07-18T05:23:34.360 回答
5

kjones的答案相同,但已更新到最新版本 对于RxJava 2.x版本:( 'io.reactivex.rxjava2:rxjava:2.1.3')

public class RetryWithDelay implements Function<Flowable<Throwable>, Publisher<?>> {

    private final int maxRetries;
    private final long retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
        return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
            @Override
            public Publisher<?> apply(Throwable throwable) throws Exception {
                if (++retryCount < maxRetries) {
                    // When this Observable calls onNext, the original
                    // Observable will be retried (i.e. re-subscribed).
                    return Flowable.timer(retryDelayMillis,
                            TimeUnit.MILLISECONDS);
                }

                // Max retries hit. Just pass the error along.
                return Flowable.error(throwable);
            }
        });
    }
}

用法:

// 将重试逻辑添加到现有的 observable。// 最多重试 3 次,延迟 2 秒。

observable
    .retryWhen(new RetryWithDelay(3, 2000));
于 2017-09-27T12:55:00.277 回答
3

现在使用 RxJava 1.0+ 版本,您可以使用 zipWith 实现延迟重试。

添加对kjones答案的修改。

修改的

public class RetryWithDelay implements 
                            Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int MAX_RETRIES;
    private final int DELAY_DURATION;
    private final int START_RETRY;

    /**
     * Provide number of retries and seconds to be delayed between retry.
     *
     * @param maxRetries             Number of retries.
     * @param delayDurationInSeconds Seconds to be delays in each retry.
     */
    public RetryWithDelay(int maxRetries, int delayDurationInSeconds) {
        MAX_RETRIES = maxRetries;
        DELAY_DURATION = delayDurationInSeconds;
        START_RETRY = 1;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> observable) {
        return observable
                .delay(DELAY_DURATION, TimeUnit.SECONDS)
                .zipWith(Observable.range(START_RETRY, MAX_RETRIES), 
                         new Func2<Throwable, Integer, Integer>() {
                             @Override
                             public Integer call(Throwable throwable, Integer attempt) {
                                  return attempt;
                             }
                         });
    }
}
于 2016-09-13T17:57:17.640 回答
1

您可以在 retryWhen 运算符返回的 Observable 中添加延迟

          /**
 * Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated
 */
@Test
public void observableOnErrorResumeNext() {
    Subscription subscription = Observable.just(null)
                                          .map(Object::toString)
                                          .doOnError(failure -> System.out.println("Error:" + failure.getCause()))
                                          .retryWhen(errors -> errors.doOnNext(o -> count++)
                                                                     .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
                                                     Schedulers.newThread())
                                          .onErrorResumeNext(t -> {
                                              System.out.println("Error after all retries:" + t.getCause());
                                              return Observable.just("I save the world for extinction!");
                                          })
                                          .subscribe(s -> System.out.println(s));
    new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}

您可以在此处查看更多示例。https://github.com/politrons/reactive

于 2016-07-18T08:08:54.210 回答
0

Kotlin & RxJava1 版本

class RetryWithDelay(private val MAX_RETRIES: Int, private val DELAY_DURATION_IN_SECONDS: Long)
    : Function1<Observable<out Throwable>, Observable<*>> {

    private val START_RETRY: Int = 1

    override fun invoke(observable: Observable<out Throwable>): Observable<*> {
        return observable.delay(DELAY_DURATION_IN_SECONDS, TimeUnit.SECONDS)
            .zipWith(Observable.range(START_RETRY, MAX_RETRIES),
                object : Function2<Throwable, Int, Int> {
                    override fun invoke(throwable: Throwable, attempt: Int): Int {
                        return attempt
                    }
                })
    }
}
于 2018-04-17T08:01:35.207 回答
0

使用重试时间

     /**
     * Retry Handler Support
     * @param errors
     * @param predicate filter error 
     * @param maxTry
     * @param periodStrategy
     * @param timeUnit
     * @return 
     */
    private  Flowable<?> retrySupport(Flowable<Throwable> errors, Predicate<? super Throwable> predicate , Integer maxTry , Function<Long, Long> periodStrategy , TimeUnit timeUnit )
    {
        LongAdder errorCount = new LongAdder();
        return errors
                .doOnNext(e -> {
                    errorCount.increment();
                    long currentCount = errorCount.longValue();
                    boolean tryContinue = predicate.test(e) && currentCount < maxTry;
                    Logger.i("No. of errors: %d , %s",  currentCount,
                            tryContinue ? String.format("please wait %d %s.", periodStrategy.apply(currentCount), timeUnit.name()) : "skip and throw");
                    if(!tryContinue)
                        throw  e;
                } )
                .flatMapSingle(e -> Single.timer( periodStrategy.apply(errorCount.longValue()), timeUnit));
    }

样本

    private Single<DeviceInfo> getErrorToken( String device)
    {
        return Single.error(  new IOException( "network is disconnect!" ) );
    }

//only retry when emit IOExpcetion
//delay 1s,2s,4s,8s,16s

this.getErrorToken( this.deviceCode )
     .retryWhen( error -> retrySupport( error, 
                 e-> e instanceof IOException,
                 5 , 
                 count-> (long)Math.pow(2,count-1),TimeUnit.SECONDS ) )
     .subscribe( deviceInfo1 -> Logger.i( "----Get Device Info---" ) ,
                 e -> Logger.e( e, "On Error" ) ,
                 () -> Logger.i("<<<<<no more>>>>>"));

于 2021-04-20T03:08:18.030 回答
0

只需这样做:

                  Observable.just("")
                            .delay(2, TimeUnit.SECONDS) //delay
                            .flatMap(new Func1<String, Observable<File>>() {
                                @Override
                                public Observable<File> call(String s) {
                                    L.from(TAG).d("postAvatar=");

                                    File file = PhotoPickUtil.getTempFile();
                                    if (file.length() <= 0) {
                                        throw new NullPointerException();
                                    }
                                    return Observable.just(file);
                                }
                            })
                            .retry(6)
                            .subscribe(new Action1<File>() {
                                @Override
                                public void call(File file) {
                                    postAvatar(file);
                                }
                            }, new Action1<Throwable>() {
                                @Override
                                public void call(Throwable throwable) {

                                }
                            });
于 2016-07-04T13:05:21.513 回答
0

(Kotlin) 我用指数退避和 Observable.range() 的应用防御发射稍微改进了代码:

    fun testOnRetryWithDelayExponentialBackoff() {
    val interval = 1
    val maxCount = 3
    val ai = AtomicInteger(1);
    val source = Observable.create<Unit> { emitter ->
        val attempt = ai.getAndIncrement()
        println("Subscribe ${attempt}")
        if (attempt >= maxCount) {
            emitter.onNext(Unit)
            emitter.onComplete()
        }
        emitter.onError(RuntimeException("Test $attempt"))
    }

    // Below implementation of "retryWhen" function, remove all "println()" for real code.
    val sourceWithRetry: Observable<Unit> = source.retryWhen { throwableRx ->
        throwableRx.doOnNext({ println("Error: $it") })
                .zipWith(Observable.range(1, maxCount)
                        .concatMap { Observable.just(it).delay(0, TimeUnit.MILLISECONDS) },
                        BiFunction { t1: Throwable, t2: Int -> t1 to t2 }
                )
                .flatMap { pair ->
                    if (pair.second >= maxCount) {
                        Observable.error(pair.first)
                    } else {
                        val delay = interval * 2F.pow(pair.second)
                        println("retry delay: $delay")
                        Observable.timer(delay.toLong(), TimeUnit.SECONDS)
                    }
                }
    }

    //Code to print the result in terminal.
    sourceWithRetry
            .doOnComplete { println("Complete") }
            .doOnError({ println("Final Error: $it") })
            .blockingForEach { println("$it") }
}
于 2018-05-07T19:01:22.523 回答
0

如果需要打印重试次数,可以使用 Rxjava 的 wiki 页面https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators中提供的示例

observable.retryWhen(errors ->
    // Count and increment the number of errors.
    errors.map(error -> 1).scan((i, j) -> i + j)  
       .doOnNext(errorCount -> System.out.println(" -> query errors #: " + errorCount))
       // Limit the maximum number of retries.
       .takeWhile(errorCount -> errorCount < retryCounts)   
       // Signal resubscribe event after some delay.
       .flatMapSingle(errorCount -> Single.timer(errorCount, TimeUnit.SECONDS));
于 2019-04-27T14:33:16.237 回答
0

和我一起工作

//retry with retryCount time after 1 sec of delay
observable.retryWhen(throwableFlowable -> {
                return throwableFlowable.take(retryCount).delay(1, TimeUnit.SECONDS);
            });
于 2021-08-14T13:22:47.007 回答
0

我对这个有点太晚了,但为了以防万一这对某人仍然有用,我为 RxJava 2 创建了一个 Kotlin 扩展函数,它将使用指数退避策略重试:

  private fun <T> Observable<T>.retryWithExponentialBackoff(): Observable<T> {
    val retriesSubject = BehaviorSubject.createDefault(0)
    return doOnNext { retriesSubject.onNext(0) }
        .retryWhen {
          it.withLatestFrom(retriesSubject) { _, retryCount ->
            retriesSubject.onNext(retryCount + 1)
            retryCount
          }.flatMap { retryCount ->
            when (retryCount) {
              MAX_RETRY_COUNT -> Observable.error(RuntimeException("Max number of retries reached"))
              else -> Observable.timer(2.0.pow(retryCount).toLong(), SECONDS)
            }
          }
        }
  }

于 2021-11-24T11:25:13.460 回答