我有一个Observable<Item>
(A),每次 a PublishSubject<Item>
(B)有一个新的Item
.
然后使用A,例如A.subscribeOn(computationScheduler).observeOn(mainThread)
目标:让来自A 的完整流在计算调度程序上完成工作,然后在主线程上使用结果。
实际:根据观察到B的位置,整个流将相应地在不同的调度程序上运行。在下面的例子中——在主线程上,即使是通过.subscribeOn()
调用。
如何“强制”来自 A 的完整流在给定调度程序上完成其工作,并在另一个调度程序上分派结果? A.compose() 不能解决问题。
实际代码:
class SomeClass
private final PublishSubject<ExerciseQueryOptions> queryOptionsPublishSubject = PublishSubject.create();
@NonNull
@Override
public Observable<List<ExerciseViewModel>> call() {
return queryOptionsPublishSubject
.startWith(createQueryOptions())
.distinctUntilChanged()
.flatMap(new Function<ExerciseQueryOptions, ObservableSource<List<ExerciseViewModel>>>() {
@Override
public ObservableSource<List<ExerciseViewModel>> apply(ExerciseQueryOptions queryOptions) throws Exception {
//This is the code I want to run on a given scheduler,
//Supplied by the code that calls this .call() method.
return datastore.queryAll(ExerciseModel.class, true)
.map(transformer);
}
});
}
//Other class
SomeClass A;
A.subscribeOn(Schedulers.computation()).observeOn(AndroidScheduers.mainThread())
.subcribe(...);