我有一个fs2.Stream
由一些元素(可能是无限的)组成的,我想为流的所有元素同时安排一些计算。这是我尝试过的
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
val stream = for {
id <- fs2.Stream.emits(List(1, 2)).covary[IO]
_ <- fs2.Stream.awakeEvery[IO](1.second)
_ <- fs2.Stream.eval(IO(println(id)))
} yield ()
stream.compile.drain.unsafeRunSync()
程序输出看起来像
1
1
1
etc...
这不是预期的。我想为原始流的所有元素交错调度计算,但不要等到第一个流终止(由于无限调度而永远不会发生)。