我正在尝试编写一个测试来查看 onBackPressureDrop 在 RxScala 中的使用。
我正在用一个简单的压缩功能压缩一个快速的 Obserable 和一个慢速的 Obserable。
奇怪的是,RxJava 中的相同示例会产生异常,但使用 RxScala 似乎不需要 onBackPressureDrop 指令。
测试如下所示:
@Test def testWithoutBackPressure() {
val fast = Observable.interval(1 millis).take(100)
val slow = Observable.interval(1000 millis).take(100)
val res = fast.zipWith(slow)(_*_)
res.subscribe(
n => { println("[testWithoutBackPressure] " + n) },
e => e.printStackTrace(),
() => println("testWithoutBackPressure done")
)
}
由于没有背压,我怎样才能使这段代码失败?