我目睹了 onBackpressureBuffer 的奇怪行为,我不确定它是有效行为还是错误。
我有一个 tcp 调用,它以一定的速率发射项目(使用流和 inputStream,但这只是为了一些信息)
最重要的是,我使用 create 创建了一个 observable,每次准备就绪时都会发出一个项目。
我们称之为messages()。
然后我这样做:
messages()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({//do some work});
我注意到使用分析工具时很少抛出 MissingBackPressureException,因此我在调用中添加了 onBackpressureBuffer。
如果我在之后添加它observeOn
:
messages()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.onBackpressureBuffer()
.subscribe({//do some work})
一切工作正常,但这意味着它只有在到达 UI 主线程后才会缓冲,所以我更喜欢它是这样的:
messages()
.onBackpressureBuffer()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({//do some work});
这就是事情开始变得奇怪的地方。
我注意到虽然messages()
一直在发出项目,但在某些时候它们将停止交付给订阅者。
更准确地说,恰好在 16 个项目之后,显然正在发生的事情是缓冲区将开始保存项目而不将它们向前传递。
一旦我messages()
使用某种超时机制取消了它,它将导致messages()
发出onError()
,并且缓冲区将立即发出它保留的所有项目(它们将被处理)。
我已经检查过是否是订户做太多工作的错,但不是,他已经完成了,但他仍然没有得到物品......
我也尝试过request(n)
在订阅者中使用该方法,在完成后要求一项,onNext()
但缓冲区不起作用。
我怀疑主 Android UI 线程的消息系统与背压导致了这种情况,但我无法解释原因。
有人可以解释为什么会这样吗?这是一个错误还是一个有效的行为?天呐!