我一直在阅读一些关于 RxJava 中背压的文档,但我找不到详细的解释,比如它是如何在库内部发生的,每个人都只是总结它,比如“生产者”太快而“消费者”太慢。
例如像下面的代码:
Observable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
.subscribe(
i -> {
System.out.println(i);
try {
Thread.sleep(100);
} catch (Exception e) { }
},
System.out::println);
我一直在浏览 RxJava 源代码,所以我的理解是在主线程中,我们将每毫秒发出事件,一旦我们发出它,我们将值传递给 System.out.println(i) 方法并将其扔到newThead 调度程序的线程池并在可运行对象中运行该方法。
所以我的问题是,异常是如何在内部发生的?因为我们在调用Thread.sleep()的时候,只是休眠了处理方法调用的线程-> System.out.println(),没有影响线程池中的其他线程,怎么会导致异常。是不是因为线程池没有足够的可用线程了?
谢谢