15

所以我在玩 RX(真的很酷),我一直在转换我的 api,它访问 Android 中的 sqlite 数据库以返回 observables。

所以很自然地,我开始尝试解决的问题之一是,“如果我想进行 3 次 API 调用,获取结果,然后在它们都完成后进行一些处理,该怎么办?”

我花了一两个小时,但我最终找到了Zip 功能,它可以轻松帮助我:

    Observable<Integer> one = getNumberedObservable(1);
    Observable<Integer> two = getNumberedObservable(2);
    Observable<Integer> three = getNumberedObservable(3);

    Observable.zip(one, two, three, new Func3<Integer, Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer arg0, Integer arg1, Integer arg2) {
            System.out.println("Zip0: " + arg0);
            System.out.println("Zip1: " + arg1);
            System.out.println("Zip2: " + arg2);
            return arg0 + arg1 + arg2;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer arg0) {
            System.out.println("Zipped Result: " + arg0);
        }
    });

public static Observable<Integer> getNumberedObservable(final int value) {
    return Observable.create(new OnSubscribeFunc<Integer>() {
        @Override
        public Subscription onSubscribe(Observer<? super Integer> observer) {
            observer.onNext(value);
            observer.onCompleted();
            return Subscriptions.empty();
        }
    });
}

伟大的!所以这很酷。

因此,当我压缩 3 个 observables 时,它们会串行运行。如果我希望它们全部同时并行运行,以便最终更快地获得结果怎么办?我玩过一些东西,甚至尝试阅读一些人们用 C# 编写的原始 RX 东西。我确信有一个简单的答案。谁能指出我正确的方向?这样做的正确方法是什么?

4

3 回答 3

18

zip 确实并行运行 observables - 但它也连续订阅它们。由于您getNumberedObservable是在订阅方法中完成的,因此给人以串行运行的印象,但实际上并没有这样的限制。

您可以尝试使用一些长期运行的 Observables,这些 Observables 的订阅逻辑寿命更长,例如timer,或者使用该subscribeOn方法异步订阅传递给的每个流zip

于 2014-01-18T18:52:28.753 回答
5

在 RxJava 中,使用toAsync将常规函数转换为将在线程上运行并在 observable 中返回其结果的函数。

我不太了解 Java 语法,但它看起来像:

public static Integer getNumber(final int value) { return value; }
public static Observable<Integer> getNumberedObservable(final int value) {
    return rx.util.functions.toAsync(new Func<Integer,Integer>() {
        @Override
        public Integer call(Integer value) { return getNumber(value); }
    });
};

getNumber如果真的要访问数据库,那将起作用。当您调用getNumberedObservable它时,它会返回一个 observable,getNumber当您订阅它时,它将在单独的线程上运行。

于 2014-01-20T02:39:08.577 回答
4

我试图做同样的事情,使用zip并行运行多个线程。我结束了打开一个新的so问题并得到了答案。基本上,你必须为每个 observable 订阅一个新线程,所以如果你想使用 zip 并行运行三个 observable,你必须订阅 3 个单独的线程。

于 2016-07-07T14:49:12.927 回答