1

我正在对队列进行一些基本测试,我想在队列出队之前添加一个服务时间。例如,服务时间没有定义,这将是 3 秒的平均值,它可能因元素而异。

有人可以提供一些启示吗?

这是我正在尝试做的事情:例如,对于 1,可能需要 3 秒,对于 2 可能需要 5 秒,对于 3 可能需要 4 秒......这个想法是平均服务时间,例如 4 秒......

import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2._
import fs2.concurrent.Queue

import scala.concurrent.duration._
class StreamTypeIntToDouble(q: Queue[IO, (Double, String)])(
  implicit timer: Timer[IO]
) {
  def storeInQueue: Stream[IO, Unit] = {

    Stream(1, 2, 3,4)
      .covary[IO]
      .evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
      .map { n =>

        (n.toDouble, "Service")
      }
      .through(q.enqueue)

  }
  def getFromQueue: Stream[IO, Unit] = {
    q.dequeue.evalMap(n => IO.delay(println(s"Pulling from queue $n")))
  }
}

object Five extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val program = for {
      q <- Queue.bounded[IO, (Double, String)](10)
      b = new StreamTypeIntToDouble(q)
      _ <- b.storeInQueue.compile.drain.start
      _ <- b.getFromQueue.compile.drain
    } yield ()
    program.as(ExitCode.Success)
  }
}
4

0 回答 0