1

我是 ZHub 和 ZStream 的新手,想熟悉他们的 API。

不幸的是,我无法使这个简单的示例工作:

for
    hub <- Hub.bounded[String](4)
    stream = ZStream.fromHub(hub)
    _ <- hub.publish("Hello")
    _ <- hub.publish("World")
    collected <- stream.runCollect
    _ <- ZIO.foreach(collected) { msg => console.putStrLn(msg) }
yield
    ()

我怀疑这个程序不会终止,因为我正在尝试收集无限流。我还尝试使用stream.tap(...)或关闭集线器打印消息。没有任何帮助。

我在这里想念什么?任何帮助表示赞赏,谢谢。

4

1 回答 1

1

@adamgfraser 在 GitHub 上提供了一个工作示例:

import zio._
import zio.stream._

object Example extends App {

  def run(args: List[String]): URIO[ZEnv, ExitCode] =
    for {
      promise <- Promise.make[Nothing, Unit]
      hub     <- Hub.bounded[String](2)
      stream = ZStream.managed(hub.subscribe).flatMap { queue =>
                 ZStream.fromEffect(promise.succeed(())) *>
                   ZStream.fromQueue(queue)
               }
      fiber     <- stream.take(2).runCollect.fork
      _         <- promise.await
      _         <- hub.publish("Hello")
      _         <- hub.publish("World")
      collected <- fiber.join
      _         <- ZIO.foreach(collected)(console.putStrLn(_)).orDie
    } yield ExitCode.success
}

我的错误是在等待订阅完成之前将值发布到集线器。

于 2021-05-07T15:50:07.697 回答