我正在尝试将流的元素从队列顺序传递到另一个队列,以使队列一次可以接收一个元素。推入队列的结果应该被拉出并传递给下一个队列进行计算。该代码没有显示任何执行错误,但它停止了第一个队列(拉和推)。谁能解释我错过了什么.??
import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2.Stream
import fs2.concurrent.Queue
import scala.concurrent.duration._
import scala.util.Random
class StreamTypeIntToDouble(q1: Queue[IO, Int], q2: Queue[IO, String])(
implicit timer: Timer[IO]
) {
val streamData = Stream(1, 2, 3).covary[IO]
val scheduledStream = Stream.fixedDelay(10.seconds) >> streamData
def storeInQueueFirst: Stream[IO, Unit] = {
scheduledStream
.evalTap(n => IO.delay(println(s"Pushing $n to Queue First")))
.metered(Random.between(1, 20).seconds)
.through(q1.enqueue)
}
def getFromQueueFirst: Stream[IO, Int] = {
q1.dequeue
.evalTap(n => IO.delay(println(s"Pulling from queue First $n")))
}
def storeInQueueSecond(s: Stream[IO, Int]): Stream[IO, Unit] = {
s.map { n =>
n.toString
}
.metered(Random.between(1, 20).seconds)
.evalTap(n => IO.delay(println(s"Pushing to queue second $n")))
.through(q2.enqueue)
}
def getFromQueueSecond: Stream[IO, Unit] = {
q2.dequeue
.evalMap(_ => IO.delay(println("Pulling element from queue second")))
}
}
object FiveTest extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q1 <- Queue.bounded[IO, Int](10)
q2 <- Queue.bounded[IO, String](10)
b = new StreamTypeIntToDouble(q1, q2)
_ <- b.storeInQueueFirst.compile.drain.start
a <- b.getFromQueueFirst.compile.lastOrError
_ <- b.storeInQueueSecond(Stream(a)).compile.drain
_ <- b.getFromQueueSecond.compile.drain
} yield ()
program.as(ExitCode.Success)
}
}
输出如下:
Pushing 1 to Queue First
Pushing 2 to Queue First
Pushing 3 to Queue First
Pulling from queue First 1
Pulling from queue First 2
Pulling from queue First 3
预期的输出是:
Pushing 1 to Queue First
Pushing 2 to Queue First
Pushing 3 to Queue First
Pulling from queue First 1
Pulling from queue First 2
Pulling from queue First 3
Pushing to queue second 1
Pushing to queue second 2
Pushing to queue second 3
Pulling from queue second 1
Pulling from queue second 2
Pulling from queue second 3