所以我知道这已经被问过很多次了,但是我尝试了很多东西,但似乎没有任何效果。
让我们从这些博客/文章/代码开始:
- https://blog.danlew.net/2016/01/25/rxjavas-repeatwhen-and-retrywhen-explained/
- https://jimbaca.com/rxjava-retrywhen/
- http://blog.inching.org/RxJava/2016-12-12-rx-java-error-handling.html
- https://pamartinezandres.com/rxjava-2-exponential-backoff-retry-only-when-internet-is-available-5a46188ab175
- https://gist.github.com/wotomas/35006d156a16345349a2e4c8e159e122
还有许多其他人。
简而言之,它们都描述了如何使用 retryWhen 来实现指数退避。像这样的东西:
source
.retryWhen(
errors -> {
return errors
.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(
retryCount -> {
System.out.println("retry count " + retryCount);
return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
});
})
甚至库中的文档也同意它: https ://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/core/Observable.java#L11919 。
但是,我已经尝试过这个和一些非常相似的变体,不值得在这里描述,而且似乎没有任何效果。有一种方法可以使示例起作用并且正在使用阻塞订阅者,但我想避免阻塞线程。
因此,如果对前一个 observable 应用一个阻塞订阅者,如下所示:
.blockingForEach(System.out::println);
它按预期工作。但因为那不是想法。如果我们尝试:
.subscribe(
x -> System.out.println("onNext: " + x),
Throwable::printStackTrace,
() -> System.out.println("onComplete"));
流程只运行一次,因此不是我想要实现的。
这是否意味着它不能像我尝试的那样使用?从文档来看,尝试完成我的要求似乎不是问题。
知道我错过了什么吗?
TIA。
编辑:我有两种测试方法:
一种测试方法(使用testng):
Observable<Integer> source =
Observable.just("test")
.map(
x -> {
System.out.println("trying again");
return Integer.parseInt(x);
});
source
.retryWhen(
errors -> {
return errors
.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(
retryCount -> {
return Observable.timer((long) Math.pow(1, retryCount), SECONDS);
});
})
.subscribe(...);
来自 Kafka 消费者(使用 Spring Boot):
这只是对观察者的订阅,但重试逻辑是我在帖子前面描述的。
@KafkaListener(topics = "${kafka.config.topic}")
public void receive(String payload) {
log.info("received payload='{}'", payload);
service
.updateMessage(payload)
.subscribe(...)
.dispose();
}