0

我正在尝试编写一个测试来查看 onBackPressureDrop 在 RxScala 中的使用。

我正在用一个简单的压缩功能压缩一个快速的 Obserable 和一个慢速的 Obserable。

奇怪的是,RxJava 中的相同示例会产生异常,但使用 RxScala 似乎不需要 onBackPressureDrop 指令。

测试如下所示:

  @Test def testWithoutBackPressure() {
    val fast = Observable.interval(1 millis).take(100)
    val slow = Observable.interval(1000 millis).take(100)

    val res = fast.zipWith(slow)(_*_)

    res.subscribe(
      n => { println("[testWithoutBackPressure] " + n) },
      e => e.printStackTrace(),
      () => println("testWithoutBackPressure done")
    )

  }

由于没有背压,我怎样才能使这段代码失败?

4

1 回答 1

0

RxJava 中默认的内部缓冲区大小是128。所以你的代码不会抛出MissingBackpressureException. 我还将您的代码重写为 Java,但没有看到MissingBackpressureException.

因此,您可以更改take(100)take(1024)使其 throw MissingBackpressureException

于 2015-09-02T09:11:41.127 回答