我正在使用 Interval 运算符,即使我的管道发生异常,我也想继续发出项目。
所以我尝试onErrorResumeNext
在出现异常的情况下使用发射项目。但是我看到在发出这个项目之后,间隔停止发出更多的项目。
这是我的单元测试。
@Test
public void testIntervalObservableWithError() {
Subscription subscription = Observable.interval(50, TimeUnit.MILLISECONDS)
.map(time -> "item\n")
.map(item -> item = null)
.map(String::toString)
.onErrorResumeNext(t-> Observable.just("item with error emitted"))
.subscribe(System.out::print, t->{
System.out.println(t);
}
);
TestSubscriber testSubscriber = new TestSubscriber((Observer) subscription);
testSubscriber.awaitTerminalEvent(20000, TimeUnit.MILLISECONDS);
}
我对这种行为感到困惑,为什么 observable 取消订阅,如果它收到来自onErrorResumeNext
解决方案:
经过一些解释,我意识到当发生错误时,可观察到的 t 就完成了。所以我最终将可能有异常的可观察对象包装到另一个可观察对象中,并且我正在使用 flatMap。于是主 Observable 继续发射项目。
@Test
public void testIntervalObservableWithError() {
Observable.interval(100, TimeUnit.MILLISECONDS)
.map(time -> "item\n")
.flatMap(item -> Observable.just(item)
.map(String::toString))
.subscribe(System.out::print);
TestSubscriber testSubscriber = new TestSubscriber();
testSubscriber.awaitTerminalEvent(5000, TimeUnit.MILLISECONDS);
}
如果有任何操作员可以做我想知道的所有魔法。
研究生