2

我正在使用 monix 接收来自 kafka 的消息。

我想在从主题中读取消息后 10 秒处理消息。

这 10 秒分钟的延迟不应阻止阅读更多消息。

我尝试使用以下代码测试此行为,使用 Task.delayExecution 延迟 10 秒

我也试过Observable.delayOnExecutionObservable.delayOnNext

import monix.eval.Task
import monix.reactive.Observable

object TestApp extends App {
  import java.util.Date
  case class SomeClass(
                        value: Int, consumingDate: Date,
                        handlerExecutionDate: Date = null
                      )
  import scala.concurrent.duration._
  import scala.concurrent.Await
  import monix.execution.Scheduler.Implicits.global

  val res = Observable.fromIterable((1 to 100).map(i => SomeClass(value = i, consumingDate = new Date)))
    .mapEval(i =>
      Task.delay(
        SomeClass(
          value = i.value,
          consumingDate = i.consumingDate
        )
      ).delayExecution(10 seconds)
    )
    .foreachL{ x =>
      val r = SomeClass(
        x.value,
        x.consumingDate,
        new Date()
      )
      println(r)
    }.runToFuture
  Await.result(res, 100.seconds)
}

但是上面的代码增加了每条消息的延迟。第一条消息延迟 10 秒,但第二条消息延迟 20 秒,第三条消息延迟 30 秒,依此类推。

使用 monix 可以做这样的事情吗?

我正在考虑基于 monix 的解决方案的其他替代方案,例如使用内存队列。消费者将继续推送到队列,直到达到限制。

更新 :

我找到了一个解决方案Task.eval(<current_time>).restartUntil(<condition>)

在下面添加代码。

package com.agoda.cusco.campaign.data.collector.consumer

import monix.eval.Task
import monix.reactive.Observable

object TestApp extends App {
  import java.util.Date
  case class SomeClass(
                        value: Int, consumingDate: Date,
                        handlerExecutionDate: Date = null
                      )
  import scala.concurrent.duration._
  import scala.concurrent.Await
  import monix.execution.Scheduler.Implicits.global

  val res = Observable.fromIterable((1 to 100).map(i => SomeClass(value = i, consumingDate = new Date(System.currentTimeMillis + i*100))))
    .mapEval(message =>
      Task.eval(new Date()).restartUntil(currentTime => (currentTime.getSeconds - message.consumingDate.getSeconds) > 10).map(_ => message)
    )
    .foreachL{ x =>
      val r = SomeClass(
        x.value,
        x.consumingDate,
        new Date()
      )
      println(r)
    }.runToFuture
  Await.result(res, 100.seconds)
}

我不完全确定它是否是理想的解决方案,因为它似乎正在进行主动 CPU 计算以使其工作。

想看看有没有更好的选择。

4

0 回答 0