0

我有一个Observable<Item> (A),每次 a PublishSubject<Item> (B)有一个新的Item.

然后使用A,例如A.subscribeOn(computationScheduler).observeOn(mainThread)


目标:让来自A 的完整流在计算调度程序上完成工作,然后在主线程上使用结果。

实际:根据观察到B的位置,整个流将相应地在不同的调度程序上运行。在下面的例子中——在主线程上,即使是通过.subscribeOn()调用。

如何“强制”来自 A 的完整流在给定调度程序上完成其工作,并在另一个调度程序上分派结果? A.compose() 不能解决问题。


实际代码:

class SomeClass

private final PublishSubject<ExerciseQueryOptions> queryOptionsPublishSubject = PublishSubject.create();

@NonNull
@Override
public Observable<List<ExerciseViewModel>> call() {
    return queryOptionsPublishSubject
            .startWith(createQueryOptions())
            .distinctUntilChanged()
            .flatMap(new Function<ExerciseQueryOptions, ObservableSource<List<ExerciseViewModel>>>() {
                @Override
                public ObservableSource<List<ExerciseViewModel>> apply(ExerciseQueryOptions queryOptions) throws Exception {
                    //This is the code I want to run on a given scheduler,
                    //Supplied by the code that calls this .call() method.
                    return datastore.queryAll(ExerciseModel.class, true)
                            .map(transformer);                      
                }
            });
}

//Other class
SomeClass A; 

A.subscribeOn(Schedulers.computation()).observeOn(AndroidScheduers.mainThread())
.subcribe(...);
4

1 回答 1

0

您的每个元素都flatMap必须在调度程序上:

                return datastore.queryAll(ExerciseModel.class, true)
                        .subscribeOn(Schedulers.computation())
                        .map(transformer);                      

您还需要在计算调度程序上订阅整个流,否则初始订阅将在当前线程上运行。

于 2016-11-20T13:17:12.463 回答