我在您的代码中发现了几个问题:
- 如果您收到有关缺少隐式的错误,您通常可以通过显式声明类型参数来修复它们:
q <- Queue.bounded[IO, Unit](10) // it will fix your error with implicits
- 您的 for comprehension 的结果类型是
IO[Unit]
,但为了使其运行,您必须从run
方法返回它。您还需要将类型从 unit 更改为ExitCode
:
stream.as(ExitCode.Success)
- 在您的方法中
startPushingToQueue
,您正在创建Steam
但没有在任何地方分配它。它只会创建流的描述,但不会运行。
我认为您想要实现的是创建将元素推送到队列的方法和另一个从队列中获取元素并打印它们的方法。请检查我的解决方案:
import cats.effect.{ ExitCode, IO, IOApp}
import fs2._
import fs2.concurrent.Queue
import scala.concurrent.duration._
class QueueInt(q: Queue[IO, Int])(implicit timer: Timer[IO]) { //I need implicit timer for metered
def startPushingToQueue: Stream[IO, Unit] = Stream(1, 2, 3)
.covary[IO]
.evalTap(n => IO.delay(println(s"Pushing element $n to Queue"))) //eval tap evaluates effect on an element but doesn't change stream
.metered(500.millis) //it will create 0.5 delay between enqueueing elements of stream,
// I added it to make visible that elements can be pushed and pulled from queue concurrently
.through(q.enqueue)
def pullAndPrintElements: Stream[IO, Unit] = q.dequeue.evalMap(n => IO.delay(println(s"Pulling element $n from Queue")))
}
object testingQueues extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val program = for {
q <- Queue.bounded[IO, Int](10)
b = new QueueInt(q)
_ <- b.startPushingToQueue.compile.drain.start //start at the end will start running stream in another Fiber
_ <- b.pullAndPrintElements.compile.drain //compile.draing compiles stream into io byt pulling all elements.
} yield ()
program.as(ExitCode.Success)
}
}
在控制台上,您将看到有关从队列中推入和拉出交错的行。如果您删除start
,您将看到startPushingToQueue
在推送所有元素后首先从完成中流出,然后才pullAndPrintElements
开始。
如果您正在寻找学习 fs2 的良好资源,我建议您应该先查看与 fs2 相关的讲座。比旧的更喜欢新的谈话,因为他们可以引用旧的 API。
您还应该查看fs2 文档指南。