我正在使用 monix 接收来自 kafka 的消息。
我想在从主题中读取消息后 10 秒处理消息。
这 10 秒分钟的延迟不应阻止阅读更多消息。
我尝试使用以下代码测试此行为,使用 Task.delayExecution 延迟 10 秒
我也试过Observable.delayOnExecution
了Observable.delayOnNext
import monix.eval.Task
import monix.reactive.Observable
object TestApp extends App {
import java.util.Date
case class SomeClass(
value: Int, consumingDate: Date,
handlerExecutionDate: Date = null
)
import scala.concurrent.duration._
import scala.concurrent.Await
import monix.execution.Scheduler.Implicits.global
val res = Observable.fromIterable((1 to 100).map(i => SomeClass(value = i, consumingDate = new Date)))
.mapEval(i =>
Task.delay(
SomeClass(
value = i.value,
consumingDate = i.consumingDate
)
).delayExecution(10 seconds)
)
.foreachL{ x =>
val r = SomeClass(
x.value,
x.consumingDate,
new Date()
)
println(r)
}.runToFuture
Await.result(res, 100.seconds)
}
但是上面的代码增加了每条消息的延迟。第一条消息延迟 10 秒,但第二条消息延迟 20 秒,第三条消息延迟 30 秒,依此类推。
使用 monix 可以做这样的事情吗?
我正在考虑基于 monix 的解决方案的其他替代方案,例如使用内存队列。消费者将继续推送到队列,直到达到限制。
更新 :
我找到了一个解决方案Task.eval(<current_time>).restartUntil(<condition>)
在下面添加代码。
package com.agoda.cusco.campaign.data.collector.consumer
import monix.eval.Task
import monix.reactive.Observable
object TestApp extends App {
import java.util.Date
case class SomeClass(
value: Int, consumingDate: Date,
handlerExecutionDate: Date = null
)
import scala.concurrent.duration._
import scala.concurrent.Await
import monix.execution.Scheduler.Implicits.global
val res = Observable.fromIterable((1 to 100).map(i => SomeClass(value = i, consumingDate = new Date(System.currentTimeMillis + i*100))))
.mapEval(message =>
Task.eval(new Date()).restartUntil(currentTime => (currentTime.getSeconds - message.consumingDate.getSeconds) > 10).map(_ => message)
)
.foreachL{ x =>
val r = SomeClass(
x.value,
x.consumingDate,
new Date()
)
println(r)
}.runToFuture
Await.result(res, 100.seconds)
}
我不完全确定它是否是理想的解决方案,因为它似乎正在进行主动 CPU 计算以使其工作。
想看看有没有更好的选择。