27

我是 RxJava 的新手,但我正在将它集成到我正在从事的项目中以帮助我学习它。我遇到了一个关于最佳实践的问题。

我有一个关于如何处理onError以防止停止Observable处理的问题。

这是设置:

对于每个我想做 2 个或更多网络请求的用户 ID,我都有一个用户 ID 列表。如果用户 ID 的任何网络请求失败,则该用户 ID 将不会更新并且可以跳过。这不应阻止处理其他用户标识。我确实有一个解决方案,但它涉及嵌套订阅(参见第二个代码块)。我确实看到的一个问题是,如果每个呼叫失败,即使在检测到某个阈值数量失败后,也无法短路并阻止其余呼叫访问网络资源。

有一个更好的方法吗?

在传统代码中:

List<String> results = new ArrayList<String>();
for (String userId : userIds) {
    try {
        String info = getInfo(userId);  // can throw an GetInfoException
        String otherInfo = getOtherInfo(userId);  // can throw an GetOtherInfoException
        results.add(info + ", " + otherInfo);
    } catch (GetInfoException e) {
        log.error(e);
    } catch (GetOtherInfoException e) {
        log.error(e);
    }
}

问题:

伪代码:

userid -> network requests -> result 
1 -> a, b -> onNext(1[a ,b])
2 -> a, onError -> onError
3 -> a, b -> onNext(3[a, b])
4 -> a, b -> onNext(4[a, b])

以下是 userIds 列表和每 2 个信息请求的工作示例。如果你运行它,你会看到它会失败(见下面的源代码)

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

public class TestMergeDelayError {

    public static Observable<String> getUserIds() {
        return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"});
    }

    public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) {
        Observable<String> observable = Observable.create(new OnSubscribeFunc<String>() {

            public Subscription onSubscribe(Observer<? super String> t1) {
                if (integer.contains(errorNumber)) {
                    t1.onError(new Exception());
                } else {
                    t1.onNext(prefix + integer);
                    t1.onCompleted();
                }
                return Subscriptions.empty();
            }
        });
        return observable;
    }

    public static void main(String[] args) {

        Observable<String> userIdObservable = getUserIds();
        Observable<String> t = userIdObservable.flatMap(new Func1<String, Observable<String>>() {

            public Observable<String> call(final String t1) {
                Observable<String> info1 = getInfo("1::: ", t1, "2");
                Observable<String> info2 = getInfo("2::: ",t1, "3");
                return Observable.mergeDelayError(info1, info2);
            }
        });

        t.subscribe(new Action1<String>() {

            public void call(String t1) {
                System.out.println(t1);
            }
        }, new Action1<Throwable>() {

            public void call(Throwable t1) {
                t1.printStackTrace();
            }
        },
        new Action0(){

            public void call() {
                System.out.println("onComplete");
            }

        });
    }
}

输出:

1::: 1
2::: 1
2::: 2
java.lang.Exception
        at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:32)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable$ParentObserver.onNext(OperationMergeDelayError.java:266)
        at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable$ParentObserver.onNext(OperationMergeDelayError.java:210)
        at rx.operators.OperationMergeDelayError$2.onSubscribe(OperationMergeDelayError.java:77)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable.onSubscribe(OperationMergeDelayError.java:171)
        at rx.operators.OperationMergeDelayError$1.onSubscribe(OperationMergeDelayError.java:64)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)
        at rx.operators.OperationMap$MapObservable$1.onNext(OperationMap.java:105)
        at rx.operators.SafeObserver.onNext(SafeObserver.java:102)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMap$MapObservable.onSubscribe(OperationMap.java:102)
        at rx.operators.OperationMap$2.onSubscribe(OperationMap.java:76)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)
        at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:483)

嵌套订阅解决方案:

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;
import rx.util.functions.Func1;

public class TestMergeDelayError {

    public static Observable<String> getUserIds() {
        return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"});
    }

    public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) {
        Observable<String> observable = Observable.create(new OnSubscribeFunc<String>() {

            public Subscription onSubscribe(Observer<? super String> t1) {
                if (integer.contains(errorNumber)) {
                    t1.onError(new Exception());
                } else {
                    t1.onNext(prefix + integer);
                    t1.onCompleted();
                }
                return Subscriptions.empty();
            }
        });
        return observable;
    }

    public static void main(String[] args) {

        Observable<String> userIdObservable = getUserIds();
        userIdObservable.subscribe(new Action1<String>() {

            public void call(String t1) {
                Observable<String> info1 = getInfo("1::: ", t1, "2");
                Observable<String> info2 = getInfo("2::: ", t1, "3");
                Observable.merge(info1, info2).subscribe(new Action1<String>() {

                    public void call(String t1) {
                        System.out.println(t1);
                    }
                }, new Action1<Throwable>() {

                    public void call(Throwable t1) {
                        t1.printStackTrace();
                    }
                },
                        new Action0() {

                            public void call() {
                                System.out.println("onComplete");
                            }

                        });
            }
        });
    }
}

输出:

1::: 1
2::: 1
onComplete
java.lang.Exception
        at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:28)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)
        at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:483)
        at TestMergeDelayError$2.call(TestMergeDelayError.java:47)
        at TestMergeDelayError$2.call(TestMergeDelayError.java:42)
        at rx.Observable$2.onNext(Observable.java:381)
        at rx.operators.SafeObserver.onNext(SafeObserver.java:102)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:367)
        at TestMergeDelayError.main(TestMergeDelayError.java:42)
1::: 3
java.lang.Exception
        at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:28)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164)
        at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106)
        at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:483)
        at TestMergeDelayError$2.call(TestMergeDelayError.java:47)
        at TestMergeDelayError$2.call(TestMergeDelayError.java:42)
        at rx.Observable$2.onNext(Observable.java:381)
        at rx.operators.SafeObserver.onNext(SafeObserver.java:102)
        at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94)
        at rx.Observable.subscribe(Observable.java:241)
        at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320)
        at rx.Observable.subscribe(Observable.java:367)
        at TestMergeDelayError.main(TestMergeDelayError.java:42)
1::: 4
2::: 4
onComplete
1::: 5
2::: 5
onComplete
1::: 6
2::: 6
onComplete

如您所见,只有失败的单个用户 ID 停止了他们的单独处理,但其余用户 ID 已被处理。

只是寻求建议,看看这个解决方案是否有意义,如果没有,最好的做法是什么。

谢谢,亚历克斯

4

4 回答 4

18

既然你想忽略错误,你可以试试onErrorResumeNext(Observable.<String>empty());. 例如,

Observable<String> info1 = getInfo("1::: ", t1, "2").onErrorResumeNext(Observable.<String>empty());
Observable<String> info2 = getInfo("2::: ", t1, "3").onErrorResumeNext(Observable.<String>empty());
return Observable.merge(info1, info2);
于 2014-03-12T02:36:30.223 回答
9

最佳实践是使用将多个 Observable 合并为一个的mergeDelayError( ),允许无错误的 Observable 在传播错误之前继续。

mergeDelayError行为很像merge. 例外情况是当被合并的 Observable 之一以 onError 通知终止时。如果合并发生这种情况,合并后的 Observable 将立即发出 onError 通知并终止。另一方面,mergeDelayError 将推迟报告错误,直到它给任何其他不产生错误的 Observables 提供了一个机会来完成它们的项目,并且它会自己发射这些项目,并且只会以当所有其他合并的 Observables 完成时发出 onError 通知。

于 2015-11-24T09:31:42.137 回答
1

查看Observable.flatMap源代码:

return merge(map(func));

如果您希望处理所有可能的用户 ID,您可以继续使用 flatMap 的修改版本:

Observable.mergeDelayError(userIdObservable.map(userInfoFunc))

再进一步,如果你说:

如果用户 ID 的任何网络请求失败,则该用户 ID 将不会更新并且可以跳过

然后不要使用:

return Observable.mergeDelayError(info1, info2);

因为这将导致 info1 和 info2 都被请求,即使其中一个失败。

而是选择:

return Observable.merge(info1, info2);

当 info1 和 info2 订阅同一个线程时,它们会按顺序运行,因此如果 info1 失败,则永远不会请求 info2。由于 info1 和 info2 是 I/O 受限的,我假设您希望并行运行它们:

getInfo("1::: ", t1, "2").subscribeOn(Schedulers.io());
getInfo("2::: ",t1, "3").subscribeOn(Schedulers.io());

这应该会显着加快您的处理速度

整个代码:

public class TestMergeDelayError {

    public static Observable<String> getUserIds() {
        return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"});
    }

    public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) {
        return Observable.create(new OnSubscribeFunc<String>() {

            public Subscription onSubscribe(Observer<? super String> t1) {
                if (integer.contains(errorNumber)) {
                    t1.onError(new Exception());
                } else {
                    t1.onNext(prefix + integer);
                    t1.onCompleted();
                }
                return Subscriptions.empty();
            }
        })
        .subscribeOn(Schedulers.io());
    }

    public static void main(String[] args) {

        Observable<String> userIdObservable = getUserIds();
        Observable<String> t = Observable.mergeDelayError(userIdObservable.map(new Func1<String, Observable<String>>() {

            public Observable<String> call(final String t1) {
                Observable<String> info1 = getInfo("1::: ", t1, "2");
                Observable<String> info2 = getInfo("2::: ",t1, "3");
                return Observable.merge(info1, info2);
            }
        }));
        //rest is the same
    }
}
于 2017-01-05T15:05:55.250 回答
1

作为一个 Rx 新手,我也在寻找一个简单的答案来分别处理异常并继续处理下一个事件,但找不到@Daniele Segato 所问问题的答案。这是您无法控制的一种解决方案:

上面的示例假设您可以控制可观察对象,即,一种方法是使用 mergeDelayError 将错误延迟到最后,或者使用合并将每个事件的已知空事件 Observable 作为 Observable 单独返回。

如果是源事件错误,可以使用lift创建另一个observable,基本优雅地处理当前Observable的值。SimpleErrorEmitter 类模拟有时会失败的无界流。

Observable.create(new SimpleErrorEmitter())
        // transform errors to write to error stream
        .lift(new SuppressError<Integer>(System.err::println))
        .doOnNext(System.out::println)  // and everything else to console
        .subscribe();


class SimpleErrorEmitter implements OnSubscribe<Integer> {
@Override
public void call(Subscriber<? super Integer> subscriber) {
    subscriber.onNext(1);
    subscriber.onNext(2);

    subscriber.onError(new FooException());

    subscriber.onNext(3);
    subscriber.onNext(4);

    subscriber.onCompleted();
}

class SuppressError<T> implements Operator<T, T> {
final Action1<Throwable> onError;
public SuppressError(Action1<Throwable> onError) {
    this.onError = onError;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> t1) {
    return new Subscriber<T>(t1) {
        @Override
        public void onNext(T t) {
            t1.onNext(t);
        }
        @Override
        public void onError(Throwable e) { // handle errors using a separate function
            onError.call(e);
        }
        @Override
        public void onCompleted() {
            t1.onCompleted();
        }
    };
}

如果是订阅者处理错误,可以尝试/捕获并优雅地继续

    Observable<Integer> justInts = justStrs.map((str) -> {
        try {
            return Integer.parseInt(str);
        } catch (NumberFormatException e) {
            return null;
        }
    });

我仍在尝试找到一种简单的方法来重试或延迟尝试失败的事件并从下一个继续。

    Observable<String> justStrs = Observable
            .just("1", "2", "three", "4", "5")  // or an unbounded stream
            // both these retrying from beginning 
            // when you delay or retry, if they are of known exception type
            .retryWhen(ex -> ex.flatMap(eachex -> {
                // for example, if it is a socket or timeout type of exception, try delaying it or retrying it
                if (eachex instanceof RuntimeException) {
                    return Observable.timer(1L, TimeUnit.MICROSECONDS, Schedulers.immediate());
                }
                return Observable.error(eachex);
            }))
            // or simply retry 2 times
            .retry(2) // if it is the source problem, attempt retry
            .doOnError((ex) -> System.err.println("On Error:" + ex));

参考:https ://groups.google.com/forum/#!topic/rxjava/trm2n6S4FSc

于 2016-10-28T08:04:36.027 回答