队列中的顺序处理。有多种方法fs2.Stream
可以管理元素的发射方式。 fixedRate, fixedDelay, awakeEvery, awakeDelay
. 我想要一个顺序处理。例如,没有元素被推入队列,直到其中的元素被拉出。
我发现 fixedDelay 方法可以做到这一点,但是当我使用它时,它并没有按预期工作。这是我尝试过的:
import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2._
import fs2.concurrent.Queue
import scala.concurrent.duration._
import scala.util.Random
class Tst(q1: Queue[IO, (Double, IO[Long])])(
implicit timer: Timer[IO]
) {
val streamData = Stream.emit(1)
val scheduledStream = Stream.fixedDelay[IO](10.seconds) >> streamData
def storeInQueue: Stream[IO, Unit] = {
scheduledStream
.evalTap(n => IO.delay(println(s"Pushing $n to Queue")))
.map { n =>
val entryTime =
timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
(n.toDouble, entryTime)
}
.through(q1.enqueue)
}
def getFromQueue: Stream[IO, Unit] = {
q1.dequeue
.evalTap(_ => timer.sleep(Random.between(10, 30).seconds))
.map { n =>
val exitTime =
timer.clock.realTime(java.util.concurrent.TimeUnit.SECONDS)
(n._1, exitTime)
}
.evalMap(n => IO.delay(println(s"Pulling from queue $n")))
}
}
object Five2 extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q <- Queue.bounded[IO, (Double, IO[Long])](1)
b = new Tst(q)
_ <- b.storeInQueue.compile.drain.start
_ <- b.getFromQueue.compile.drain
} yield ()
program.as(ExitCode.Success)
}
}
我得到这个:
Pushing 1 to Queue
Pushing 1 to Queue
Pulling from queue (1.0,IO$301748227)
Pushing 1 to Queue
Pulling from queue (1.0,IO$914911384)
Pushing 1 to Queue
Pulling from queue (1.0,IO$1022209005)
Pushing 1 to Queue
Pushing 1 to Queue
Pulling from queue (1.0,IO$1801897948)
预期输出:
Pushing 1 to Queue
Pulling from queue (1.0,IO$1022209005)
Pushing 1 to Queue
Pulling from queue (1.0,IO$1801897948)
Pushing 1 to Queue
Pulling from queue (1.0,IO$1801897948)