我正在使用 RxJava2,假设我有这个 Observable:
Observable
.create(emitter ->
SomeDependency.registerCallback(data -> emitter.onNext(data))
)
.subscribeOn(Schedulers.io());
它观察一些异步逻辑,然后发出它从中得到的任何东西。重要的是要知道,在注册的回调中,数据是在由SomeDependency
. 因此,这会导致所有emitter
到下游的排放都在该线程上传递,而忽略 defined Scheduler
。
在https://stackoverflow.com/a/43283760/1618316中有来自 @akarnokd 的提示,指出了一种使用Scheduler.Worker
. 这种方法的修改示例如下所示:
Observable
.create(emitter -> {
final Worker worker = Schedulers.trampoline().createWorker();
emitter.setDisposable(worker);
SomeDependency.registerCallback(data ->
worker.schedule(() -> emitter.onNext(data))
)
})
.subscribeOn(Schedulers.io());
注意:为当前线程
trampoline()
创建一个Scheduler
。在我们的例子中io()
,我们已经定义了创建 Observable 的线程。
问题是,在创建这种 Observable 时,您通常不仅需要,registerCallback()
而且还需要unregisterCallback()
. 在常见情况下,您放入unregisterCallback()
' emitter
sDisposable
中。但是,如您所见,我们emitter
已经有一个Disposable
,无法设置另一个。如果第二个Disposable
将被设置,那么前一个将被取消设置和处置。
请问您对如何解决这个问题有任何想法吗?