我想通了,通过测试。以下测试通过(这意味着 Observable 上的发射都在同一个后台线程上):
volatile long observableThreadId;
@Test
public void transformedObservables_shouldRunInSameThread() {
Observable.from(new String[]{"a", "b", "c"}) //
.flatMap(new Func1<String, Observable<Object>>() {
@Override public Observable<Object> call(String s) {
observableThreadId = Thread.currentThread().getId();
return Observable.from((Object) s);
}
}) //
.map(new Func1<Object, String>() {
@Override public String call(Object o) {
long id = Thread.currentThread().getId();
if (id != observableThreadId) {
throw new RuntimeException("Thread ID mismatch");
}
return (String) o;
}
}) //
.flatMap(new Func1<String, Observable<String>>() {
@Override public Observable<String> call(String s) {
long id = Thread.currentThread().getId();
if (id != observableThreadId) {
throw new RuntimeException("Thread ID mismatch");
}
return Observable.from(s);
}
}) //
.subscribeOn(Schedulers.newThread()) //
.observeOn(Schedulers.currentThread()) //
.subscribe(new Observer<String>() {
@Override public void onCompleted() {
assertThat(Thread.currentThread().getId()).isNotEqualTo(observableThreadId);
}
@Override public void onError(Throwable throwable) {
}
@Override public void onNext(String s) {
}
});
System.out.println("blah");
}
================================ 更新:
实际上可以在Scheduler上的 ReactiveX 文档中找到更好的答案:
默认情况下,一个 Observable 和你应用到它的操作符链将在调用它的Subscribe方法的同一个线程上完成它的工作,并通知它的观察者。SubscribeOn操作符通过指定 Observable 应该在其上运行的不同调度程序来更改此行为。ObserveOn操作符指定了一个不同的调度器,Observable 将使用它来向它的观察者发送通知。
... SubscribeOn运算符指定 Observable 将开始在哪个线程上运行,无论在运算符链中的哪个点调用该运算符。另一方面, ObserveOn会影响 Observable 将在该运算符出现的位置下方使用的线程。出于这个原因,您可以在 Observable 运算符链中的不同点多次调用 ObserveOn,以更改某些运算符在哪些线程上运行。