我遇到了令我惊讶的 Scala Observables 行为。考虑下面我的例子:
object ObservablesDemo extends App {
val oFast = Observable.interval(3.seconds).map(n => s"[FAST] ${n*3}")
val oSlow = Observable.interval(7.seconds).map(n => s"[SLOW] ${n*7}")
val oBoth = (oFast merge oSlow).take(8)
oBoth.subscribe(println(_))
oBoth.toBlocking.toIterable.last
}
该代码演示了从两个可观察对象发射元素。其中一个以“慢”的方式(每 7 秒)发射其元素,另一个以“快速”的方式(每 3 秒)发射。为了这个问题,假设我们想使用map
函数定义这些可观察对象,并从上面看到的适当映射数字interval
(而不是另一种可能的方法,即从两个可观察对象以相同的速率发射项目,然后filter
根据需要退出)。
代码的输出对我来说似乎违反直觉:
[FAST] 0
[FAST] 3
[SLOW] 0
[FAST] 6
[FAST] 9 <-- HERE
[SLOW] 7 <-- HERE
[FAST] 12
[FAST] 15
有问题的部分是当 observable 在[FAST]
observable 发出9
之前[SLOW]
发出7
。我希望在第 7 秒发出的内容之前发出,因为在第 9 秒发出的内容之前7
。9
我应该如何修改代码以实现预期的行为?我查看了 RxScala 文档并开始搜索不同的interval
函数和Scheduler
类等主题,但我不确定它是否是搜索答案的正确位置。