我在使用 RxJava 背压时遇到问题。基本上,我有一个生产者生产的项目比消费者可以处理的要多,并且希望有一些缓冲队列来只处理我可以处理的项目,并在我完成其中一些项目时请求,就像在这个例子中一样:
object Tester extends App {
Observable[Int] { subscriber =>
(1 to 100).foreach { e =>
subscriber.onNext(e)
Thread.sleep(100)
println("produced " + e + "(" + Thread.currentThread().getName + Thread.currentThread().getId + ")")
}
}
.subscribeOn(NewThreadScheduler())
.observeOn(ComputationScheduler())
.subscribe(
new Subscriber[Int]() {
override def onStart(): Unit = {
request(2)
}
override def onNext(value: Int): Unit = {
Thread.sleep(1000)
println("consumed " + value + "(" + Thread.currentThread().getName + Thread.currentThread().getId + ")")
request(1)
}
override def onCompleted(): Unit = {
println("finished ")
}
})
Thread.sleep(100000)
我希望得到像这样的输出
produced 1(RxNewThreadScheduler-113)
consumed 1(RxComputationThreadPool-312)
produced 2(RxNewThreadScheduler-113)
consumed 2(RxComputationThreadPool-312)
produced 3(RxNewThreadScheduler-113)
consumed 3(RxComputationThreadPool-312)
......
但相反,我得到
produced 1(RxNewThreadScheduler-113)
produced 2(RxNewThreadScheduler-113)
produced 3(RxNewThreadScheduler-113)
produced 4(RxNewThreadScheduler-113)
produced 5(RxNewThreadScheduler-113)
produced 6(RxNewThreadScheduler-113)
produced 7(RxNewThreadScheduler-113)
produced 8(RxNewThreadScheduler-113)
produced 9(RxNewThreadScheduler-113)
consumed 1(RxComputationThreadPool-312)
produced 10(RxNewThreadScheduler-113)
produced 11(RxNewThreadScheduler-113)
produced 12(RxNewThreadScheduler-113)
produced 13(RxNewThreadScheduler-113)
.....