我正在对队列进行一些基本测试,我想在队列出队之前添加一个服务时间。例如,服务时间没有定义,这将是 3 秒的平均值,它可能因元素而异。
有人可以提供一些启示吗?
这是我正在尝试做的事情:例如,对于 1,可能需要 3 秒,对于 2 可能需要 5 秒,对于 3 可能需要 4 秒......这个想法是平均服务时间,例如 4 秒......
import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2._
import fs2.concurrent.Queue
import scala.concurrent.duration._
class StreamTypeIntToDouble(q: Queue[IO, (Double, String)])(
implicit timer: Timer[IO]
) {
def storeInQueue: Stream[IO, Unit] = {
Stream(1, 2, 3,4)
.covary[IO]
.evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
.map { n =>
(n.toDouble, "Service")
}
.through(q.enqueue)
}
def getFromQueue: Stream[IO, Unit] = {
q.dequeue.evalMap(n => IO.delay(println(s"Pulling from queue $n")))
}
}
object Five extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q <- Queue.bounded[IO, (Double, String)](10)
b = new StreamTypeIntToDouble(q)
_ <- b.storeInQueue.compile.drain.start
_ <- b.getFromQueue.compile.drain
} yield ()
program.as(ExitCode.Success)
}
}