1

当生产者产生事件的速度快于客户消费时。

我认为将FlowableonBackpressureLatest()一起使用,我可以获得最新的事件发出。

但事实证明有一个大小为 128 的默认缓冲区。我得到的是以前缓冲的过时事件。

那么我怎样才能得到实际的最新事件呢?

这是示例代码:

Flowable.interval(40, TimeUnit.MILLISECONDS)
            .doOnNext{
                println("doOnNext $it")
            }
            .onBackpressureLatest()
            .observeOn(Schedulers.single())
            .subscribe {
                println("subscribe $it")
                Thread.sleep(100)
            }

我所期望的:

doOnNext    0
    subscribe   0
doOnNext    1
doOnNext    2
    subscribe   2
doOnNext    3
doOnNext    4
doOnNext    5
    subscribe   5
doOnNext    6
doOnNext    7
    subscribe   7
doOnNext    8
doOnNext    9
doOnNext    10
    subscribe   10
...

我得到了什么:

doOnNext    0
    subscribe   0
doOnNext    1
doOnNext    2
    subscribe   1
doOnNext    3
doOnNext    4
doOnNext    5
    subscribe   2
doOnNext    6
doOnNext    7
    subscribe   3
doOnNext    8
doOnNext    9
doOnNext    10
    subscribe   4
...
doOnNext    325
    subscribe   127
doOnNext    326
doOnNext    327
doOnNext    328
    subscribe   246
...
4

1 回答 1

5

您的问题实际上在于observeOn,默认情况下最多请求 128 个项目并将此请求传递给backpressureLatest,因此它的行为不符合您的预期。

您可以使用.observeOn(Scheduler,boolean,int)来指定应该修复您所看到的行为的缓冲区大小。

于 2018-11-30T18:42:43.490 回答