我正在探索 fs2 库并进行测试,我有一个 Stream[IO, Int] 被推入队列 [IO,(Double, String, String, String)]。我需要确定进入队列的时间以及退出时间。
这里的问题是处理时间可能因元素而异。为此,我想到了使用.metered((Random.between(1,10)).seconds)
问题是我无法找到一种方法来存储入队时的进入时间和在给定的一定时间后的退出时间.metered((Random.between(1,10)).seconds)
这是我尝试过的:
import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2._
import fs2.concurrent.Queue
import scala.concurrent.duration._
import scala.util.Random
class StreamTypeIntToDouble(q: Queue[IO, (Double, String, String, String)])(
implicit timer: Timer[IO]
) {
import core.Processing._
def storeInQueue: Stream[IO, Unit] = {
Stream(1, 2, 3)
.covary[IO]
.evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
.map { n =>
val entryTime = currentTimeNow
val exitTime = currentTimeNow
(n.toDouble, "Service", entryTime, exitTime)
}
.metered(Random.between(1, 20).seconds)
.through(q.enqueue)
//.metered((Random.between(1, 10).seconds))
}
def getFromQueue: Stream[IO, Unit] = {
q.dequeue
.evalMap(n => IO.delay(println(s"Pulling from queue $n")))
}
}
object Five extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q <- Queue.bounded[IO, (Double, String, String, String)](10)
b = new StreamTypeIntToDouble(q)
_ <- b.storeInQueue.compile.drain.start
_ <- b.getFromQueue.compile.drain
} yield ()
program.as(ExitCode.Success)
}
}
用于识别当前时间的方法实现如下
def currentTimeNow: String = {
val format = new SimpleDateFormat("dd-MM-yy hh:mm:ss")
format.format(Calendar.getInstance().getTime())
}
问题:如何跟踪进入和退出时间以构建新的输出流?