1

我正在使用 Interval 运算符,即使我的管道发生异常,我也想继续发出项目。

所以我尝试onErrorResumeNext在出现异常的情况下使用发射项目。但是我看到在发出这个项目之后,间隔停止发出更多的项目。

这是我的单元测试。

@Test
public void testIntervalObservableWithError() {
    Subscription subscription = Observable.interval(50, TimeUnit.MILLISECONDS)
            .map(time -> "item\n")
            .map(item -> item = null)
            .map(String::toString)
            .onErrorResumeNext(t-> Observable.just("item with error emitted"))
            .subscribe(System.out::print, t->{
                        System.out.println(t);
                    }
                   );
    TestSubscriber testSubscriber = new TestSubscriber((Observer) subscription);
    testSubscriber.awaitTerminalEvent(20000, TimeUnit.MILLISECONDS);
}

我对这种行为感到困惑,为什么 observable 取消订阅,如果它收到来自onErrorResumeNext

解决方案:

经过一些解释,我意识到当发生错误时,可观察到的 t 就完成了。所以我最终将可能有异常的可观察对象包装到另一个可观察对象中,并且我正在使用 flatMap。于是主 Observable 继续发射项目。

@Test
public void testIntervalObservableWithError() {
    Observable.interval(100, TimeUnit.MILLISECONDS)
            .map(time -> "item\n")
            .flatMap(item -> Observable.just(item)
                    .map(String::toString))
            .subscribe(System.out::print); 
    TestSubscriber testSubscriber = new TestSubscriber();
    testSubscriber.awaitTerminalEvent(5000, TimeUnit.MILLISECONDS);
}

如果有任何操作员可以做我想知道的所有魔法。

研究生

4

3 回答 3

1

RxJava 流使用的约定是,一旦发出错误,就不应再发出任何项目。如果您的用例要求流在错误之后继续,那么您还需要将错误转换为onNext发射。创建一个包装器类型说ValueOrError<T>并开始思考Observable<ValueOrError<T>>

Observable<Integer> source = ...
Observable<ValueOrError<Integer>> o = 
  source.map(x -> {
    try { 
      return new ValueOrError<>(mightThrow(x)); 
    }
    catch (Throwable e) {
      return new ValueOrError<>(e);
    });
于 2016-11-17T23:25:51.330 回答
1

您的订阅中断,因为当onErrorResumeNext触发时,您的上游已经完成并出现错误。而且您只需发出一个项目,而不是让异常顺流而下。为了让上游保持活力,您必须防止在其上抛出异常。

对于您的特定示例解决方案可以是这样的:

...
    .map(time -> "item\n")
    .map(item -> item = null)
    .map(item -> {
        try {
            return item.toString();
        } catch (NullPointerException e) {
            return "item with error emitted";
    })
    //no onErrorResumeNext()
    .subscribe ...

onErrorResumeNext只是用一个项目替换错误并调用onComplete.

于 2016-11-18T00:57:49.730 回答
1

这是一个使用 onErrorResumeNext 在出错时继续调用 http 服务的工作示例。花了几个小时找到它应该如何工作(RxJs 的新手),但最终从@Maksim Ostrovidov 的评论中理解了它,所以把它贴在这里。您可以通过在开发工具“网络选项卡”中切换离线复选框来在 chrome 中对其进行测试

var interval = 1000;
timer(0, interval).pipe(
          switchMap(_ => this.http.get('api/health').pipe(onErrorResumeNext()))
        ).subscribe(response => this.savedResponse = response);

于 2018-09-27T11:35:19.833 回答