我正在尝试递归定义一个可观察对象,它要么从主题发出项目,要么如果经过一定时间,则为默认值,在这种情况下,我使用计时器的默认值零。我正在使用 RxScala 并从以下代码开始:
val s = PublishSubject[Int]()
def o: Observable[Unit] = {
val timeout = Observable.timer(1 second)
Observable.amb(s, timeout)
.first
.concatMap((v) => {
println(v)
o
})
}
ComputationScheduler().createWorker.schedule {
var value = 0
def loop(): Unit = {
Thread.sleep(5000)
s.onNext(value + 1)
value += 1
loop()
}
loop()
}
o.toBlocking.last
这似乎应该工作,但输出令人困惑。每隔一个零序列包含两个而不是预期的四个。发出两个零,剩下的三秒过去了,但没有输出。
0
0
0
0
1
0
0
2
0
0
0
0
3
0
0
4
0
0
0
0
5
0
0
6
0
0
0
0
7
0
0
8