0

以下代码应该putStrLn并行执行效果,因为mapMPar

val runtime = zio.Runtime.default
val foo = ZIO.sleep(5.second) *> ZIO("foo")
val bar = ZIO("bar")

val k = ZStream.fromEffect(foo) ++ ZStream.fromEffect(bar)
val r = k.mapMPar(3)(x => console.putStrLn(s"Processing `${x}`"))

runtime.unsafeRun(r.runDrain)

但实际上无论如何它总是foo先处理。bar我错过了什么还是一个错误?

4

1 回答 1

4

我认为您的示例并没有按照您的预期进行。fromEffect创建一个基本上说“我有一个最终会生成单个项目的效果”的流,然后第一个流在生成该项目之前等待 5 秒。由于流的性质,++orconcat运算符是惰性的,这意味着它无法开始处理,直到从第一个流中消耗完所有项目(这不会在 5 秒内发生)。结果,您的流看起来像这样:

--5s--(foo)(bar)|

而不是我想象的你认为它应该像的那样:

(bar)--5s--(foo)|

考虑它的最佳方式可能是,对于大多数河流,您有一条单车道高速公路,一次只能移动一个项目,并且所有后续项目都被线路头部的项目阻挡。一旦你撞到那个Par街区,你就会打开多条车道,这意味着更快移动的东西可能会超车。

因此,我可以通过执行以下操作来实现所需的行为:

val k = ZStream("foo", "bar")
val r = k.mapMPar(3)(x => putStrLn(s"$x:enter") *> (ZIO.sleep(5.second) *> putStrLn(s"Processing `${x}`")) <* putStrLn(s"$x:exit"))

r.runDrain

或者写得更紧凑:

ZStream("foo", "bar").mapMPar(3)(x => for {
  _ <- putStrLn(s"$x:enter")
  _ <- ZIO.sleep(5.seconds) *> putStrLn(s"Processing `$x`")
  _ <- putStrLn(s"$x:exit")
} yield ()).runDrain
于 2020-08-18T03:26:03.747 回答