3

我目睹了 onBackpressureBuffer 的奇怪行为,我不确定它是有效行为还是错误。

我有一个 tcp 调用,它以一定的速率发射项目(使用流和 inputStream,但这只是为了一些信息)

最重要的是,我使用 create 创建了一个 observable,每次准备就绪时都会发出一个项目。

我们称之为messages()。

然后我这样做:

messages()
   .subscribeOn(Schedulers.io())
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe({//do some work});

我注意到使用分析工具时很少抛出 MissingBackPressureException,因此我在调用中添加了 onBackpressureBuffer。

如果我在之后添加它observeOn

messages()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .onBackpressureBuffer()
    .subscribe({//do some work})

一切工作正常,但这意味着它只有在到达 UI 主线程后才会缓冲,所以我更喜欢它是这样的:

messages()
    .onBackpressureBuffer()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({//do some work});

这就是事情开始变得奇怪的地方。

我注意到虽然messages()一直在发出项目,但在某些时候它们将停止交付给订阅者。

更准确地说,恰好在 16 个项目之后,显然正在发生的事情是缓冲区将开始保存项目而不将它们向前传递。

一旦我messages()使用某种超时机制取消了它,它将导致messages()发出onError(),并且缓冲区将立即发出它保留的所有项目(它们将被处理)。

我已经检查过是否是订户做太多工作的错,但不是,他已经完成了,但他仍然没有得到物品......

我也尝试过request(n)在订阅者中使用该方法,在完成后要求一项,onNext()但缓冲区不起作用。

我怀疑主 Android UI 线程的消息系统与背压导致了这种情况,但我无法解释原因。

有人可以解释为什么会这样吗?这是一个错误还是一个有效的行为?天呐!

4

2 回答 2

5

不知道如何,根据所描述的行为,这是与此问题messages()类似的相同池死锁

解决方法,你没有尝试,是把和.onBackpressureBuffer 之间subscribeOnobserveOn

messages()
.subscribeOn(Schedulers.io())
.onBackpressureBuffer()           // <---------------------
.observeOn(AndroidSchedulers.mainThread())
.subscribe({//do some work});
于 2016-10-20T09:12:05.703 回答
1

问题不同,但答案归结为同一件事。observeOn构造函数的实现OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize)::

public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;

}

最后一行指向缓冲区大小。

Android 上的缓冲区大小为16

解决方案只是将更大的缓冲区大小传递给observeOn(Scheduler scheduler, int bufferSize)操作员:

messages()
    .observeOn(AndroidSchedulers.mainThread(), {buffer_size})

注意不要设置太高的值,因为 Android 的内存有限。

于 2016-10-20T09:19:10.487 回答