1

我在使用 RxJava 背压时遇到问题。基本上,我有一个生产者生产的项目比消费者可以处理的要多,并且希望有一些缓冲队列来只处理我可以处理的项目,并在我完成其中一些项目时请求,就像在这个例子中一样:

object Tester extends App {

Observable[Int] { subscriber =>
  (1 to 100).foreach { e =>
    subscriber.onNext(e)
    Thread.sleep(100)
    println("produced " + e + "(" + Thread.currentThread().getName + Thread.currentThread().getId + ")")
  }
}
.subscribeOn(NewThreadScheduler())
.observeOn(ComputationScheduler())
.subscribe(
  new Subscriber[Int]() {
    override def onStart(): Unit = {
      request(2)
    }

    override def onNext(value: Int): Unit = {
      Thread.sleep(1000)
      println("consumed " + value + "(" + Thread.currentThread().getName + Thread.currentThread().getId + ")")
      request(1)
    }

    override def onCompleted(): Unit = {
      println("finished ")
    }
})

Thread.sleep(100000)

我希望得到像这样的输出

produced 1(RxNewThreadScheduler-113)
consumed 1(RxComputationThreadPool-312)
produced 2(RxNewThreadScheduler-113)
consumed 2(RxComputationThreadPool-312)
produced 3(RxNewThreadScheduler-113)
consumed 3(RxComputationThreadPool-312)
......

但相反,我得到

produced 1(RxNewThreadScheduler-113)
produced 2(RxNewThreadScheduler-113)
produced 3(RxNewThreadScheduler-113)
produced 4(RxNewThreadScheduler-113)
produced 5(RxNewThreadScheduler-113)
produced 6(RxNewThreadScheduler-113)
produced 7(RxNewThreadScheduler-113)
produced 8(RxNewThreadScheduler-113)
produced 9(RxNewThreadScheduler-113)
consumed 1(RxComputationThreadPool-312)
produced 10(RxNewThreadScheduler-113)
produced 11(RxNewThreadScheduler-113)
produced 12(RxNewThreadScheduler-113)
produced 13(RxNewThreadScheduler-113)
.....
4

1 回答 1

1

当您实施Observable使用Observable.create时,由您来管理背压(这不是一项简单的任务)。在这里,您的 observable 只是忽略了响应式拉取请求(您只是迭代,而不是等待调用迭代器next()方法的请求)。

如果可能,请尝试使用Observable工厂方法,例如range,等等...并使用map/flatMap来获得所需的源 Observable,因为这些方法会尊重背压。

否则,请查看最近引入的用于在OnSubscribe实现中正确管理背压的实验性实用程序类:AsyncOnSubscribeSyncOnSubscribe.

这是一个非常幼稚的例子:

Observable<Integer> backpressuredObservable = 
  Observable.create(SyncOnSubscribe.createStateful(
    () -> 0, //starts the state at 0
    (state, obs) -> {
        int i = state++; //first i is 1 as desired
        obs.next(i);
        if (i == 100) { //maximum is 100, stop there
            obs.onCompleted();
        }
        return i; //update the state
}));
于 2016-01-16T16:09:26.633 回答