1

我正在尝试递归定义一个可观察对象,它要么从主题发出项目,要么如果经过一定时间,则为默认值,在这种情况下,我使用计时器的默认值零。我正在使用 RxScala 并从以下代码开始:

val s = PublishSubject[Int]()

def o: Observable[Unit] = {
  val timeout = Observable.timer(1 second)
  Observable.amb(s, timeout)
    .first
    .concatMap((v) => {
      println(v)
      o
    })
}

ComputationScheduler().createWorker.schedule {
  var value = 0
  def loop(): Unit = {
    Thread.sleep(5000)
    s.onNext(value + 1)
    value += 1
    loop()
  }
  loop()
}

o.toBlocking.last

这似乎应该工作,但输出令人困惑。每隔一个零序列包含两个而不是预期的四个。发出两个零,剩下的三秒过去了,但没有输出。

0
0
0
0
1
0
0
2
0
0
0
0
3
0
0
4
0
0
0
0
5
0
0
6
0
0
0
0
7
0
0
8
4

2 回答 2

0

将您的代码重构为以下内容会生成预期的结果(在我的机器上):

object Test {
  def main(args: Array[String]) {
    val s = PublishSubject[Int]()

    val timeout = Observable.timer(1 second)

    def o: Observable[Unit] = {
      Observable.amb(s, timeout).first
        .concatMap((v) => {
          println(v)
          o
        })
    }

    var value = 0
    NewThreadScheduler().createWorker.scheduleRec {
      Thread.sleep(5000)
      value += 1
      s.onNext(value)
    }

    o.toBlocking.last
  }
}

请注意切换到 NewThreadScheduler 并使用 scheduleRec 方法,而不是手动递归调度。

于 2015-03-19T12:24:08.847 回答
0

这真是令人费解!所以这里有一个理论:

  1. 实际上,您的代码每 5 秒产生 4 个滴答声,而不是 5 个。
  2. 4号有一个竞态条件,一个,先超时获胜,然后是worker,然后是超时,等等。

所以,序列不是 00001 002 00003... 而是将其视为 0000 1002 0000...

因此,您可能在这里遇到两个不同的问题,如果不摆弄它,我将无能为力,但是您可以尝试以下方法:

  1. 还要在 o() 中添加一个序列号,这样您就可以看到哪些超时没有赢得比赛。
  2. 将值从 1 秒和 5 秒更改为彼此不成倍数的值,例如 1.5 和 5。这可能会帮助您解决一个问题并专注于另一个问题。
  3. 让一个外部的、不相关的工作人员每秒打印一次“----”。大约 0.3 秒后启动它。可能会让您更好地了解分歧在哪里。
于 2015-03-18T20:22:54.943 回答