1

我正在尝试将流的元素从队列顺序传递到另一个队列,以使队列一次可以接收一个元素。推入队列的结果应该被拉出并传递给下一个队列进行计算。该代码没有显示任何执行错误,但它停止了第一个队列(拉和推)。谁能解释我错过了什么.??

import cats.effect.{ExitCode, IO, IOApp, Timer}
import fs2.Stream
import fs2.concurrent.Queue

import scala.concurrent.duration._
import scala.util.Random

class StreamTypeIntToDouble(q1: Queue[IO, Int], q2: Queue[IO, String])(
  implicit timer: Timer[IO]
) {
  val streamData = Stream(1, 2, 3).covary[IO]
  val scheduledStream = Stream.fixedDelay(10.seconds) >> streamData

  def storeInQueueFirst: Stream[IO, Unit] = {
    scheduledStream
      .evalTap(n => IO.delay(println(s"Pushing $n to Queue First")))
      .metered(Random.between(1, 20).seconds)
      .through(q1.enqueue)

  }
  def getFromQueueFirst: Stream[IO, Int] = {
    q1.dequeue
      .evalTap(n => IO.delay(println(s"Pulling from queue First $n")))

  }

  def storeInQueueSecond(s: Stream[IO, Int]): Stream[IO, Unit] = {
    s.map { n =>
        n.toString
      }
      .metered(Random.between(1, 20).seconds)
      .evalTap(n => IO.delay(println(s"Pushing to queue second $n")))
      .through(q2.enqueue)
  }

  def getFromQueueSecond: Stream[IO, Unit] = {
    q2.dequeue
      .evalMap(_ => IO.delay(println("Pulling element from queue second")))
  }

}

object FiveTest extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val program = for {
      q1 <- Queue.bounded[IO, Int](10)
      q2 <- Queue.bounded[IO, String](10)

      b = new StreamTypeIntToDouble(q1, q2)
      _ <- b.storeInQueueFirst.compile.drain.start
      a <- b.getFromQueueFirst.compile.lastOrError
      _ <- b.storeInQueueSecond(Stream(a)).compile.drain
      _ <- b.getFromQueueSecond.compile.drain
    } yield ()
    program.as(ExitCode.Success)
  }
}

输出如下:

Pushing 1 to Queue First
Pushing 2 to Queue First
Pushing 3 to Queue First
Pulling from queue First 1
Pulling from queue First 2
Pulling from queue First 3

预期的输出是:

Pushing 1 to Queue First
Pushing 2 to Queue First
Pushing 3 to Queue First
Pulling from queue First 1
Pulling from queue First 2
Pulling from queue First 3
Pushing to queue second 1
Pushing to queue second 2
Pushing to queue second 3
Pulling from queue second 1
Pulling from queue second 2
Pulling from queue second 3
4

1 回答 1

4

您似乎不了解有关流如何工作的一些事情。

当您运行stream.drain时,这意味着将处理此流的线程/光纤/whatver 将等到流终止。

所以当你有

_ <- b.storeInQueueSecond(Stream(a)).compile.drain

您实际上是在阻塞,并且在此步骤中的流读取所有元素,认为自己耗尽并关闭之前,下一个操作不会开始。(一旦流关闭,就无法让它再次运行,你必须创建一个新的)。

如果你想:

  • 把东西放在队列 1 上
  • 订阅队列 1 并将每个元素推送到队列 2
  • 订阅队列 2

你应该:

  • 并行执行这些步骤 - 这可以使用 Fibers 来实现(.start用于在新的 Fiber 中开始计算)
  • 以这样一种方式写入队列,即当所有元素都被接收和流式传输时,流将关闭(接收所有元素的条件必须明确定义,例如无终止队列/流 - 只要你愿意,你就可以Some(value)发送排队保持打开状态,None最后关闭它)

只有第二种方法可以让你有这样的排序输出,但它也会破坏使用流的目的,因为它只是将元素推送到队列 - 关闭流 - 拉取元素 - 关闭流 - 推送元素 - 关闭流 - 拉取元素- 关闭流,因此仅作为练习才有意义。如果你走第一种方式,它会是这样的

object FiveTest extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    for {
      q1 <- Queue.bounded[IO, Int](10)
      q2 <- Queue.bounded[IO, String](10)

      b = new StreamTypeIntToDouble(q1, q2)

      // q1 is started in a separate fiber to not block the next line
      _ <- b.storeInQueueFirst.compile.drain.start
      // output from q1 is redirected to q2 and stream is run in a separate fiber
      // to not not block running the next operation
      _ <- b.storeInQueueSecond(b.getFromQueueFirst).compile.drain.start
      // stream from q2 is blocking until stream completes...
      _ <- b.getFromQueueSecond.compile.drain
      // ... so that we won't return ExitCode if program is still supposed to run
    } yield ExitCode.Success
  }
}
Pushing 1 to Queue First
Pushing 2 to Queue First
Pulling from queue First 1
Pushing to queue second 1
Pulling element from queue second
Pulling from queue First 2
Pushing to queue second 2
Pulling element from queue second
...
于 2020-06-22T10:58:48.897 回答