0

我正在为我要完成的任务编写一个简单的示例。假设有一个tasks我想每 1 秒触发一次的任务列表 () 。

这可以通过 ascheduler或其他方式来完成。

现在这个流有两个消费者,但是

  • C1应在完成所有任务时触发
  • C2应该在所有任务的第 n 次完成时触发。(也可以每隔 n 秒)

这是一些示例代码。目前,它没有计划重复 - 因为我不知道Observable.repeat使用Scheduler.

import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.execution.Scheduler.{global => scheduler}
import monix.reactive.{Consumer, Observable}

import scala.concurrent.duration._

object MainTest {

  def main(args: Array[String]): Unit = {

    def t = (i: Int) => Observable.eval {
      print(i)
      i
    }

    val tsks = (1 to 5).map(t)

    val tasks = Observable.fromIterable(tsks).flatten.doOnCompleteEval(Task.eval(println("")))

    val c1 = Consumer.foreach[Int](x => println(s"C1: [$x]"))
    val c2 = Consumer.foreach[Int](x => println(s"C2: [$x]"))

    val s = tasks.reduce(_ + _).publish

    s.consumeWith(c1).runAsync
    s.consumeWith(c2).runAsync
    s.connect()

    while (true) {
      Thread.sleep(1.hour.toMillis)
    }
  }

}
4

1 回答 1

2

首先,对于每 1 秒重复一次任务,您可以...

Observable.intervalAtFixedRate(1.second)
  .flatMap(_ => Observable.eval(???))

要在所有任务完成时触发,您可以使用completed(如果您想要Observable[Nothing]只发出最终完成事件的 an)或completedL(如果您想使用 aTask[Unit]代替)。有关详细信息,请参阅API 文档

因此,您可以执行以下操作,而不是您c1的事情:

s.completeL.runAsync

但是,为了对源进行采样,您可以使用:

  • sample(别名throttleLast
  • sampleRepeated
  • throttleFirst
  • debounce
  • debounceRepeated
  • echo
  • echoRepeated

我鼓励你玩这些,从API 文档开始。

s.sample(10.seconds).doOnNext(println).completedL.runAsync

或者,您可以简单地将每 N 个元素与takeEveryNth

s.takeEveryNth(20).doOnNext(println).completedL.runAsync

让我知道这是否回答了您的问题。

于 2017-11-03T15:08:04.910 回答