我有一个Observable<String>
执行一些工作的。完成后,它会关闭其连接(通过.setDisposable(Disposables.fromAction(logic*);
. 麻烦的是,我需要在与实际工作负载相同的调度程序上执行此关闭操作。
可能吗?如何?
我不想强制 Observable 在给定的调度程序上执行,例如 myObservable.subscribeOn(x).unsubscribeOn(x);
;
您需要一个单线程调度程序才能工作,您可以通过Schedulers.from(Executor)
或新获得一个SingleScheduler
(警告:内部)。
问题unsubscribeOn
在于它仅在下游实际处理/取消流时才运行(与几乎所有时间都发生这种情况的 1.x 不同)。
更好的方法是使用using
上述自定义调度程序并手动安排清理:
Observable<T> createObservable(Scheduler scheduler) {
return Observable.create(s -> {
Resource res = ...
s.setCancellable(() ->
scheduler.scheduleDirect(() ->
res.close() // may need try-catch here
)
);
s.onNext(...);
s.onComplete();
}).subscribeOn(scheduler);
}
Scheduler scheduler = new SingleScheduler();
createObservable(scheduler)
.map(...)
.filter(...)
.subscribe(...);
但请注意,除非create
逻辑放弃调度程序(通过终止或异步),否则取消逻辑可能由于相同池活锁而永远不会执行。