1

我正在使用 RxJava2,假设我有这个 Observable:

Observable
   .create(emitter -> 
      SomeDependency.registerCallback(data -> emitter.onNext(data))
   )
   .subscribeOn(Schedulers.io());

它观察一些异步逻辑,然后发出它从中得到的任何东西。重要的是要知道,在注册的回调中,数据是在由SomeDependency. 因此,这会导致所有emitter到下游的排放都在该线程上传递,而忽略 defined Scheduler

https://stackoverflow.com/a/43283760/1618316中有来自 @akarnokd 的提示,指出了一种使用Scheduler.Worker. 这种方法的修改示例如下所示:

Observable
   .create(emitter -> {
      final Worker worker = Schedulers.trampoline().createWorker();
      emitter.setDisposable(worker);
      SomeDependency.registerCallback(data -> 
         worker.schedule(() -> emitter.onNext(data))
      )
   })
   .subscribeOn(Schedulers.io());

注意:为当前线程trampoline()创建一个Scheduler。在我们的例子中io(),我们已经定义了创建 Observable 的线程。

问题是,在创建这种 Observable 时,您通常不仅需要,registerCallback()而且还需要unregisterCallback(). 在常见情况下,您放入unregisterCallback()' emittersDisposable中。但是,如您所见,我们emitter已经有一个Disposable,无法设置另一个。如果第二个Disposable将被设置,那么前一个将被取消设置和处置。

请问您对如何解决这个问题有任何想法吗?

4

1 回答 1

1

我想你需要的是CompositeDisposable.

可以容纳多个其他一次性用品的一次性容器。

于 2017-05-08T00:41:17.687 回答