3

我一直在阅读一些关于 RxJava 中背压的文档,但我找不到详细的解释,比如它是如何在库内部发生的,每个人都只是总结它,比如“生产者”太快而“消费者”太慢。

例如像下面的代码:

Observable.interval(1, TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.newThread())
    .subscribe(
        i -> {
            System.out.println(i);
            try {
                Thread.sleep(100);
            } catch (Exception e) { }
        },
        System.out::println);

我一直在浏览 RxJava 源代码,所以我的理解是在主线程中,我们将每毫秒发出事件,一旦我们发出它,我们将值传递给 System.out.println(i) 方法并将其扔到newThead 调度程序的线程池并在可运行对象中运行该方法。

所以我的问题是,异常是如何在内部发生的?因为我们在调用Thread.sleep()的时候,只是休眠了处理方法调用的线程-> System.out.println(),没有影响线程池中的其他线程,怎么会导致异常。是不是因为线程池没有足够的可用线程了?

谢谢

4

1 回答 1

7

您可以将背压视为一个操作员向其上游源发放许可的系统:您可以给我 128 个元素。稍后,该操作员可能会说“好的,再给我 96 个”,因此总共可能有 224 个未完成的许可证。一些来源,例如interval不关心许可证,只是定期分发值。由于许可证的数量通常与队列或缓冲区中的可用容量密切相关,因此分发的数量超过这些存储空间就可以保持收益MissingBackpressureException

检测背压违规主要发生在offer有界队列返回 false 时,例如observeOn指示队列已满的情况。

检测违规的第二种方法是跟踪操作员中未完成的许可计数,例如onBackpressureDrop,每当上游发送超过此数量时,操作员根本不会转发它:

// in onBackpressureDrop
public void onNext(T value) {
    if (emitted != availablePermits) {
        emitted++;
        child.onNext(value);
    } else {
        // ignoring this value
    }
}

子订阅者通过 request() 发出其许可信号,这通常会导致以下情况onBackpressureDrop

public void childRequested(long n) {
    availablePermits += n;
}

在实践中,由于可能的异步执行,availablePermits是一个AtomicLong(并且被称为requested)。

于 2016-08-12T10:19:31.053 回答