54

我想知道如何忽略异常并继续无限流(在我的情况下是位置流)?

我正在获取当前用户位置(使用Android-ReactiveLocation),然后将它们发送到我的 API(使用Retrofit)。

在我的情况下,当网络调用(例如超时)期间发生异常时,调用onError方法并且流自行停止。如何避免?

活动:

private RestService mRestService;
private Subscription mSubscription;
private LocationRequest mLocationRequest = LocationRequest.create()
            .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
            .setInterval(100);
...
private void start() {
    mRestService = ...;
    ReactiveLocationProvider reactiveLocationProvider = new ReactiveLocationProvider(this);
    mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
            .buffer(50)
            .flatMap(locations -> mRestService.postLocations(locations)) // can throw exception
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe();
}

休息服务:

public interface RestService {
    @POST("/.../")
    Observable<Response> postLocations(@Body List<Location> locations);
}
4

11 回答 11

71

您可能想要使用错误处理运算符之一。

  • onErrorResumeNext( )— 指示 Observable 在遇到错误时发出一系列项目
  • onErrorReturn( )— 指示 Observable 在遇到错误时发出特定项目
  • onExceptionResumeNext( )— 指示 Observable 在遇到异常后继续发射项目(但不是另一种 throwable)
  • retry( )— 如果源 Observable 发出错误,请重新订阅它,希望它能够正确完成
  • retryWhen( )— 如果源 Observable 发出错误,则将该错误传递给另一个 Observable 以确定是否重新订阅源

特别是在你的情况retryonExceptionResumeNext看起来很有希望。

于 2015-03-10T18:15:38.560 回答
27

mRestService.postLocations(locations)发出一项,然后完成。如果发生错误,则它会发出错误,从而完成流。

当您在 a 中调用此方法时flatMap,错误会继续出现在您的“主”流中,然后您的流停止。

您可以做的是将您的错误转换为另一个项目(如此处所述:https ://stackoverflow.com/a/28971140/476690 ),但不是在您的主流上(我认为您已经尝试过),而是在mRestService.postLocations(locations).

这样,此调用将发出一个错误,该错误将被转换为一个项目/另一个可观察对象,然后完成。(不打电话onError)。

在消费者视图上,mRestService.postLocations(locations)将发出一个项目,然后完成,就像一切都成功一样。

mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
        .buffer(50)
        .flatMap(locations -> mRestService.postLocations(locations).onErrorReturn((e) -> Collections.emptyList()) // can't throw exception
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe();
于 2015-03-11T09:58:01.740 回答
20

如果您只想忽略内部的错误flatMap 而不返回元素,请执行以下操作:

flatMap(item -> 
    restService.getSomething(item).onErrorResumeNext(Observable.empty())
);
于 2017-08-22T11:06:06.163 回答
7

只需粘贴来自@MikeN 的答案的链接信息,以防它丢失:

import rx.Observable.Operator;
import rx.functions.Action1;

public final class OperatorSuppressError<T> implements Operator<T, T> {
    final Action1<Throwable> onError;

    public OperatorSuppressError(Action1<Throwable> onError) {
        this.onError = onError;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> t1) {
        return new Subscriber<T>(t1) {

            @Override
            public void onNext(T t) {
                t1.onNext(t);
            }

            @Override
            public void onError(Throwable e) {
                onError.call(e);
            }

            @Override
            public void onCompleted() {
                t1.onCompleted();
            }

        };
    }
}

并在可观察源附近使用它,因为其他运营商可能会在此之前急切地取消订阅。

Observerable.create(connectToUnboundedStream()).lift(new OperatorSuppressError(log()).doOnNext(someStuff()).subscribe();

但是请注意,这会抑制来自源的错误传递。如果链中的任何 onNext 在它引发异常之后,它仍然可能会取消订阅源。

于 2015-11-04T21:45:29.023 回答
2

这个答案可能有点晚了,但如果有人偶然发现这一点,可以使用 Jacke Wharton 的随时可用的 Relay 库,而不是重新发明轮子

https://github.com/JakeWharton/RxRelay

有很好的文档,但本质上,中继是一个主题,除了不能调用 onComplete 或 onError。

选项是:

行为继电器

Relay that emits the most recent item it has observed and all subsequent observed items to each subscribed Observer.
    // observer will receive all events.
    BehaviorRelay<Object> relay = BehaviorRelay.createDefault("default");
    relay.subscribe(observer);
    relay.accept("one");
    relay.accept("two");
    relay.accept("three");

    // observer will receive the "one", "two" and "three" events, but not "zero"
    BehaviorRelay<Object> relay = BehaviorRelay.createDefault("default");
    relay.accept("zero");
    relay.accept("one");
    relay.subscribe(observer);
    relay.accept("two");
    relay.accept("three");

PublishRelay 中继,一旦观察者订阅,将所有后续观察到的项目发送给订阅者。

    PublishRelay<Object> relay = PublishRelay.create();
    // observer1 will receive all events
    relay.subscribe(observer1);
    relay.accept("one");
    relay.accept("two");
    // observer2 will only receive "three"
    relay.subscribe(observer2);
    relay.accept("three");

ReplayRelay 缓存它观察到的所有项目并将它们重播到任何订阅的观察者的中继。

    ReplayRelay<Object> relay = ReplayRelay.create();
    relay.accept("one");
    relay.accept("two");
    relay.accept("three");
    // both of the following will get the events from above
    relay.subscribe(observer1);
    relay.subscribe(observer2);
于 2020-10-02T12:40:36.210 回答
1

尝试在 Observable.defer 调用中调用其余服务。这样,对于每次调用,您将有机会使用它自己的“onErrorResumeNext”,并且错误不会导致您的主流完成。

reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
  .buffer(50)
  .flatMap(locations ->
    Observable.defer(() -> mRestService.postLocations(locations))
      .onErrorResumeNext(<SOME_DEFAULT_TO_REACT_TO>)
  )
........

该解决方案最初来自该线程-> RxJava Observable and Subscriber for skipping exception?,但我认为它也适用于您的情况。

于 2015-03-11T08:20:18.350 回答
1

为这个问题添加我的解决方案:

privider
    .compose(ignoreErrorsTransformer)
    .subscribe()

private final Observable.Transformer<ResultType, ResultType> ignoreErrorsTransformer =
        new Observable.Transformer<ResultType, ResultType>() {
            @Override
            public Observable<ResultType> call(Observable<ResultType> resultTypeObservable) {
                return resultTypeObservable
                        .materialize()
                        .filter(new Func1<Notification<ResultType>, Boolean>() {
                            @Override
                            public Boolean call(Notification<ResultType> resultTypeNotification) {
                                return !resultTypeNotification.isOnError();
                            }
                        })
                        .dematerialize();

            }
        };
于 2016-07-05T14:19:10.663 回答
0

使用 Rxjava2,我们可以调用带有 delayErrors 参数的重载平面图: flatmap javadoc

将其传递为 true 时:

当前 Flowable 和所有内部 Publisher 的异常被延迟,直到它们全部终止,如果为 false,第一个发出异常信号的异常将立即终止整个序列

于 2020-06-04T16:23:41.543 回答
0

您可以使用 onErrorComplete() 方法跳过错误

mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
    .buffer(50)
    .flatMapMaybe(locations -> Maybe.just(mRestService.postLocations(locations).onErrorComplete()) // skip item
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe();
于 2021-05-31T10:26:42.537 回答
0

对解决方案 (@MikeN) 稍作修改,以使有限流能够完成:

import rx.Observable.Operator;
import rx.functions.Action1;

public final class OperatorSuppressError<T> implements Operator<T, T> {
    final Action1<Throwable> onError;

    public OperatorSuppressError(Action1<Throwable> onError) {
        this.onError = onError;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> t1) {
        return new Subscriber<T>(t1) {

            @Override
            public void onNext(T t) {
                t1.onNext(t);
            }

            @Override
            public void onError(Throwable e) {
                onError.call(e);
                //this will allow finite streams to complete
                t1.onCompleted();
            }

            @Override
            public void onCompleted() {
                t1.onCompleted();
            }

        };
    }
}
于 2015-12-15T04:10:03.427 回答
0

这是我忽略错误的 kotlin 扩展函数

fun <T> Observable<T>.ignoreErrors(errorHandler: (Throwable) -> Unit) =
    retryWhen { errors ->
        errors
            .doOnNext { errorHandler(it) }
            .map { 0 }
    }

retryWhen用于无限期地重新订阅上游,同时仍然允许您以非终端方式处理错误。

这感觉很危险

于 2019-11-21T04:26:12.537 回答