我认为您的示例并没有按照您的预期进行。fromEffect
创建一个基本上说“我有一个最终会生成单个项目的效果”的流,然后第一个流在生成该项目之前等待 5 秒。由于流的性质,++
orconcat
运算符是惰性的,这意味着它无法开始处理,直到从第一个流中消耗完所有项目(这不会在 5 秒内发生)。结果,您的流看起来像这样:
--5s--(foo)(bar)|
而不是我想象的你认为它应该像的那样:
(bar)--5s--(foo)|
考虑它的最佳方式可能是,对于大多数河流,您有一条单车道高速公路,一次只能移动一个项目,并且所有后续项目都被线路头部的项目阻挡。一旦你撞到那个Par
街区,你就会打开多条车道,这意味着更快移动的东西可能会超车。
因此,我可以通过执行以下操作来实现所需的行为:
val k = ZStream("foo", "bar")
val r = k.mapMPar(3)(x => putStrLn(s"$x:enter") *> (ZIO.sleep(5.second) *> putStrLn(s"Processing `${x}`")) <* putStrLn(s"$x:exit"))
r.runDrain
或者写得更紧凑:
ZStream("foo", "bar").mapMPar(3)(x => for {
_ <- putStrLn(s"$x:enter")
_ <- ZIO.sleep(5.seconds) *> putStrLn(s"Processing `$x`")
_ <- putStrLn(s"$x:exit")
} yield ()).runDrain