0

我正在用 reactiveX Zip 做一些实验,我注意到我在我的 zip 中定义的 observables 是一个接一个地按顺序执行的。我认为 zip 的好处是在 zip 中定义的每一个 observable 都由一个线程执行,所以它们都是并行执行的。有什么方法可以实现我想要的吗?这是我的 zip 示例

         @Test
public void testZip() {
    Observable.zip(obString(), obString1(), obString2(), (s, s2, s3) -> s.concat(s2)
                                                                     .concat(s3))
              .subscribe(System.out::println);
}

public Observable<String> obString() {
    System.out.println(Thread.currentThread().getId());
    return Observable.just("hello");
}

public Observable<String> obString1() {
    System.out.println(Thread.currentThread().getId());
    return Observable.just(" world");
}

public Observable<String> obString2() {
    System.out.println(Thread.currentThread().getId());
    return Observable.just("!");
}
4

1 回答 1

2

你在看错误的东西。

obString*都在同一个线程上执行,因为它们是在您调用它们时执行的testZip

您想要查看的是 observable 中发生情况,这无法简单地使用just,您需要一个自定义的 observable 并查看onSubscribe.

此外,您可能希望使用scheduleOn为您的Observable.

于 2015-12-20T21:19:12.797 回答