0

我正在学习monix 3
下一个代码:

object Main extends TaskApp {
  override def runc = {
    Observable.fromIterable(1 to 10)
      .map{i =>
        val delay = Random.nextInt(1000) + 1000
        println(s"Starting $i, delay = $delay")
        Thread.sleep(delay)     // Imitation of hard execution
        i
      }
      .map{i =>
        val delay = Random.nextInt(1000) + 1000
        println(s"Continue $i, delay = $delay")
        Thread.sleep(delay)
        i
      }
      .consumeWith(Consumer.loadBalance(3, Consumer.foreach(i => println(s"End $i"))))   //Compile error here
  }
}

导致编译错误:

缺少参数类型
.consumeWith(Consumer.loadBalance(3, Consumer.foreach(i => println(s"End $i"))))

我想不通,这里出了什么问题,以及如何使这段代码编译?

UPD
第二个问题是如何每n分钟重复一次这个流?

4

1 回答 1

3

作为对第一个问题的回答,您必须明确地给一个类型参数foreach

Consumer.foreach[Int](i => println(s"End $i"))

要回答您的第二个问题,请使用Observable.intervalAtFixedRateor Observable.intervalAtFixedDelay

请参考Monix Scaladoc

我希望这有帮助。

于 2018-10-18T08:13:32.217 回答