1

在这里,我有一个 flowable,它每毫秒发出一次元素。

   Flowable<Long> source = Flowable.interval(1,TimeUnit.MILLISECONDS).take(14000);
        source.map(e->{
            Log.d("TAGBefore","before " + e);
            return e;
        })
        .onBackpressureDrop()
        .observeOn(Schedulers.computation())
        .subscribe(
                        e-> {
                            Log.d("TAGNext","onNext: " + e);
                            Thread.sleep(100);
                        },
                        e-> Log.d("TAGError","error: " + e),
                        ()-> Log.d("TAGComplete","onComplete")
        );

我使用 before 来知道 observable 发射元素的时刻,我怀疑这里从 127(当观察者满时)到 9688

   TAGNext: onNext: 125
   TAGNext: onNext: 126
   TAGNext: onNext: 127
   TAGNext: onNext: 9668
   TAGNext: onNext: 9669
   TAGNext: onNext: 9670

但是,当我更多地检查控制台(使用其他搜索过滤器)时,我意识到当 127 发出时它已经转到 12794,所以不是 9688,它不应该是 12794 或接近的数字?, 谢谢。

   TAGBefore: before: 12793
   TAGBefore: before: 12794
   TAGNext:   onNext: 127
   TAGBefore: before: 12795
   TAGBefore: before: 12796
 

但是,当我更多地检查控制台(使用其他搜索过滤器)时,我意识到当发出 127 时,它已经是 12794,所以不是 9688,它不应该是 12794 或当可观察对象已经空闲时的接近数字? ,我澄清一下我是 RxJava 的新手,以防我说错了,谢谢。

4

1 回答 1

2

observeOn有一个默认的 128 元素缓冲区,很快就会填满。

它每 100 毫秒被耗尽,直到它只剩下 32 个元素,此时它又请求 96 个元素。因此,让更多项目通过大约需要 9600 毫秒onBackpressureDrop,因此您会看到TAGNext: onNext: 9668

当第 127 个元素被耗尽时,运行大约需要 12700 毫秒,因此您TAGBefore: before: 12795从生产者端看到。

于 2020-10-21T09:19:49.913 回答