1

我正在尝试使用 RX Java 来使用来自不断发送对象的源的一些数据。

我想知道如何针对我自己的代码引发异常的情况实施重试策略。例如,网络异常应触发具有指数退避策略的重试。

一些代码:

 message.map(this::processMessage)
                 .subscribe((message)->{
                     //do something after mapping
                 });

processMessage(message)是包含可能失败的风险代码的方法,它是我想要重试的代码部分,但我不想阻止可观察者使用源中的数据。

对此有什么想法吗?

4

3 回答 3

10
message
    .map(this::processMessage)
    .retryWhen(errors -> errors.flatMap(error -> {  
        if (error instanceof IOException) {
          return Observable.just(null);
        }
        // For anything else, don't retry
        return Observable.error(error);
     })
     .subscribe(
         System.out::println,
         error -> System.out.println("Error!")
     );

或抓住错误

message.map(this::processMessage)
           .onErrorReturn(error -> "Empty result")
           .subscribe((message)->{})

或处理错误

message
    .map(this::processMessage)
    .doOnError(throwable -> Log.e(TAG, "Throwable " + throwable.getMessage()))
    .subscribe(
         System.out::println,
         error -> System.out.println("Error!")
     );

未经测试,但 retryWhen 与 repeatWhen 不同,这不仅在 onComplete 中调用。

http://blog.danlew.net/2016/01/25/rxjavas-repeatwhen-and-retrywhen-explained/ -> 每个错误都是平面映射的,因此我们可以返回 onNext(null) (触发重新订阅)或 onError (错误)(避免重新订阅)。

退避政策:

source.retryWhen(errors ->  
  errors
    .zipWith(Observable.range(1, 3), (n, i) -> i)
    .flatMap(retryCount -> Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS))
);

在这种情况下,flatMap + timer 比延迟更可取,因为它允许我们通过重试次数来修改延迟。上述重试 3 次,每次重试延迟 5 ^ retryCount,只需少量运算符即可为您提供指数退避!

于 2017-03-06T10:30:29.837 回答
0

最近我开发了完全适合您需求的库。

RetrofitRxErrorHandler

如果您将Exponential策略与您结合起来,backupObservable您将获得预期的结果。

于 2017-03-06T12:10:20.860 回答