0

问题是关于 RxJava2。

Throwable请注意,来自retryWhenwith 的zipping会在应用 zipper 功能之前range发出所有项目。Observable.range此外,range即使zipWith没有被调用,也会发出序列。例如这个源代码

Observable.create<String> {
        println("subscribing")
        it.onError(RuntimeException("always fails"))
    }
    .retryWhen {
        it.zipWith(Observable.range(1, 3).doOnNext { println("range $it") },
                BiFunction { t: Throwable, i: Int -> i })
                .flatMap {
                    System.out.println("delay retry by $it + second(s)")
                    Observable.timer(it.toLong(), TimeUnit.SECONDS)
                }
    }./*subscribe*/

给出以下结果

 range 1
 range 2
 range 3
 subscribing
 delay retry by 1 + second(s)
 subscribing
 delay retry by 2 + second(s)
 subscribing
 delay retry by 3 + second(s)
 subscribing
 onComplete

onError在创建中替换observable也不会消除发射range项目。所以问题是为什么它会像Range冷一样发生。

4

1 回答 1

1

Observable2.x 中的 s 没有背压,因此range操作员将尽快发出其所有项目。但是,您的情况可以使用随着重试处理程序的错误通知递增的普通计数器:

source.retryWhen(e -> {
    int[] counter = { 0 };
    return e.takeWhile(v -> ++counter[0] < 4)
            .flatMap(v -> Observable.timer(counter[0], TimeUnit.SECONDS));
})
于 2017-08-24T21:32:36.183 回答