0

我有一个Observable<String>执行一些工作的。完成后,它会关闭其连接(通过.setDisposable(Disposables.fromAction(logic*);. 麻烦的是,我需要在与实际工作负载相同的调度程序上执行此关闭操作。

可能吗?如何?

我不想强制 Observable 在给定的调度程序上执行,例如 myObservable.subscribeOn(x).unsubscribeOn(x);

4

1 回答 1

4

您需要一个单线程调度程序才能工作,您可以通过Schedulers.from(Executor)或新获得一个SingleScheduler(警告:内部)。

问题unsubscribeOn在于它仅在下游实际处理/取消流时才运行(与几乎所有时间都发生这种情况的 1.x 不同)。

更好的方法是使用using上述自定义调度程序并手动安排清理:

Observable<T> createObservable(Scheduler scheduler) {
    return Observable.create(s -> {
        Resource res = ...

        s.setCancellable(() -> 
            scheduler.scheduleDirect(() ->
                res.close() // may need try-catch here
            )
        );

        s.onNext(...);
        s.onComplete();
    }).subscribeOn(scheduler);
}

Scheduler scheduler = new SingleScheduler();

createObservable(scheduler)
.map(...)
.filter(...)
.subscribe(...);

但请注意,除非create逻辑放弃调度程序(通过终止或异步),否则取消逻辑可能由于相同池活锁而永远不会执行。

于 2016-11-09T14:51:59.203 回答