我正在为我要完成的任务编写一个简单的示例。假设有一个tasks
我想每 1 秒触发一次的任务列表 () 。
这可以通过 ascheduler
或其他方式来完成。
现在这个流有两个消费者,但是
C1
应在完成所有任务时触发C2
应该在所有任务的第 n 次完成时触发。(也可以每隔 n 秒)
这是一些示例代码。目前,它没有计划重复 - 因为我不知道Observable.repeat
使用Scheduler
.
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.execution.Scheduler.{global => scheduler}
import monix.reactive.{Consumer, Observable}
import scala.concurrent.duration._
object MainTest {
def main(args: Array[String]): Unit = {
def t = (i: Int) => Observable.eval {
print(i)
i
}
val tsks = (1 to 5).map(t)
val tasks = Observable.fromIterable(tsks).flatten.doOnCompleteEval(Task.eval(println("")))
val c1 = Consumer.foreach[Int](x => println(s"C1: [$x]"))
val c2 = Consumer.foreach[Int](x => println(s"C2: [$x]"))
val s = tasks.reduce(_ + _).publish
s.consumeWith(c1).runAsync
s.consumeWith(c2).runAsync
s.connect()
while (true) {
Thread.sleep(1.hour.toMillis)
}
}
}