6

最近我意识到我不明白RxJava2背压是如何工作的。

我做了一个小测试,我希望它应该失败并出现MissingBackpressureException异常:

@Test
public void testBackpressureWillFail() {
    Observable.<Integer>create(e -> {
        for (int i = 0; i < 10000; i++) {
            System.out.println("Emit: " + i);
            e.onNext(i);
        }
        e.onComplete();
    })
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.computation())
    .doOnNext(i -> {
        Thread.sleep(100);
        System.out.println("Processed:" + i);
    })
    .blockingSubscribe();
}

系统输出显示如下:

Emit: 0
Emit: 1
Emit: 2
...
Emit: 10000

Processed:0
Processed:1
Processed:2
...
Processed:10000

为什么它不产生MissingBackpressureException

我希望这e.onNext(i);会将项目放入缓冲区ObservableObserveOn并在其大小大于之后static final int BUFFER_SIZE = Math.max(16,Integer.getInteger("rx2.buffer-size",128).intValue());

它应该抛出MissingBackpressureException不会发生的。缓冲区会自动增长吗?如果不是,物品存放在哪里?

4

1 回答 1

3

这是因为背压Flowable仅适用于 RxJava2,请参见此处
如果您将切换到FlowableBackpressureStrategy.MISSING您将得到例外。
这也意味着在您的情况下,您确实有自动增长的缓冲区,来自observerOn文档:

修改 ObservableSource 以在指定的调度程序上执行其发射和通知,与无界缓冲区异步...

于 2017-06-21T11:42:19.717 回答