我有这个 RxJava2 可流动的
private val pulseFlowable = Flowable
.interval(200, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.computation())
和这个订阅者
pulseFlowable
.observeOn(Schedulers.computation())
.subscribe(
object : Subscriber<Long> {
override fun onSubscribe(subscription: Subscription) {
pulseSubscription = subscription
}
override fun onNext(long: Long?) {
onPulse()
}
override fun onError(throwable: Throwable) {
Timber.e("onError")
}
override fun onComplete() {
Timber.d("onComplete")
}
}
)
它产生一个MissingBackPressureException
,我“有点”理解:由于我在同一个调度程序上订阅和观察,我最初认为可能不会发生背压,但也许这是一个误解。
当我将流动性更改为
private val pulseFlowable = Flowable
.interval(200, TimeUnit.MILLISECONDS)
.onBackpressureLatest()
.subscribeOn(Schedulers.computation())
我不再得到异常,但也没有调用 onNext。这是为什么?