4

假设我在Observable上有一堆转换:

operation()
    .flatMap(toSomething())
    .map(toSomethingElse())
    .flatMap(toYetSomethingElse())
    .subscribeOn(Schedulers.newThread())
    .observeOn(AdroidSchedulers.mainThread())
    .subscribe(observer);

除了最后一次调用之外,所有这些操作是否同步flatMap()?还是所有操作都在我告诉它订阅的线程上运行?

4

1 回答 1

2

我想通了,通过测试。以下测试通过(这意味着 Observable 上的发射都在同一个后台线程上):

    volatile long observableThreadId;

    @Test
    public void transformedObservables_shouldRunInSameThread() {

        Observable.from(new String[]{"a", "b", "c"}) //
            .flatMap(new Func1<String, Observable<Object>>() {
                @Override public Observable<Object> call(String s) {
                    observableThreadId = Thread.currentThread().getId();
                    return Observable.from((Object) s);
                }
            }) //
            .map(new Func1<Object, String>() {
                @Override public String call(Object o) {
                    long id = Thread.currentThread().getId();
                    if (id != observableThreadId) {
                        throw new RuntimeException("Thread ID mismatch");
                    }

                    return (String) o;
                }
            }) //
            .flatMap(new Func1<String, Observable<String>>() {
                @Override public Observable<String> call(String s) {
                    long id = Thread.currentThread().getId();
                    if (id != observableThreadId) {
                        throw new RuntimeException("Thread ID mismatch");
                    }

                    return Observable.from(s);
                }
            }) //
            .subscribeOn(Schedulers.newThread()) //
            .observeOn(Schedulers.currentThread()) //
            .subscribe(new Observer<String>() {
                @Override public void onCompleted() {
                    assertThat(Thread.currentThread().getId()).isNotEqualTo(observableThreadId);
                }

                @Override public void onError(Throwable throwable) {

                }

                @Override public void onNext(String s) {

                }
            });

        System.out.println("blah");
    }

================================ 更新:

实际上可以在Scheduler上的 ReactiveX 文档中找到更好的答案:

默认情况下,一个 Observable 和你应用到它的操作符链将在调用它的Subscribe方法的同一个线程上完成它的工作,并通知它的观察者。SubscribeOn操作符通过指定 Observable 应该在其上运行的不同调度程序来更改此行为。ObserveOn操作符指定了一个不同的调度器,Observable 将使用它来向它的观察者发送通知。

... SubscribeOn运算符指定 Observable 将开始在哪个线程上运行,无论在运算符链中的哪个点调用该运算符。另一方面, ObserveOn会影响 Observable 将在该运算符出现的位置下方使用的线程。出于这个原因,您可以在 Observable 运算符链中的不同点多次调用 ObserveOn,以更改某些运算符在哪些线程上运行。

于 2014-03-11T21:05:27.540 回答