2

我有一个fs2.Stream由一些元素(可能是无限的)组成的,我想为流的所有元素同时安排一些计算。这是我尝试过的

implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO]     = IO.timer(ExecutionContext.global)

val stream = for {
  id <- fs2.Stream.emits(List(1, 2)).covary[IO]
  _ <- fs2.Stream.awakeEvery[IO](1.second)
  _ <- fs2.Stream.eval(IO(println(id)))
} yield ()

stream.compile.drain.unsafeRunSync()

程序输出看起来像

1
1
1
etc...

这不是预期的。我想为原始流的所有元素交错调度计算,但不要等到第一个流终止(由于无限调度而永远不会发生)。

4

2 回答 2

1

根据@KrzysztofAtłasik 和@LuisMiguelMejíaSuárez 给出的提示,这是我刚刚提出的解决方案:

val originalStream = fs2.Stream.emits(List(1, 2))

val scheduledComputation = originalStream.covary[IO].map({ id =>
        fs2.Stream.awakeEvery[IO](1.second).evalMap(_ => IO.delay(println(id)))
}).fold(fs2.Stream.empty.covaryAll[IO, Unit])((result, stream) => result.merge(stream)).flatten

@KrzysztofAtłasik 在评论中提出的具有交错的解决方案 id <- fs2.Stream.emits(List(1, 2)).covary[IO]并且_ <- fs2.Stream.awakeEvery[IO](1.second)也有效,但它不允许以自己的方式安排每个元素。

要在几秒钟内同时安排元素,elementValue可以执行以下操作:

val scheduleEachElementIndividually = originalStream.covary[IO].map({ id =>
                                 //id.seconds
        fs2.Stream.awakeEvery[IO](id.second).evalMap(_ => IO.delay(println(id)))
}).fold(fs2.Stream.empty.covaryAll[IO, Unit])((result, stream) => result.merge(stream)).flatten
于 2020-07-14T17:46:43.130 回答
1
val str = for {
  id <- Stream.emits(List(1, 5, 7)).covary[IO]
  res = timer.sleep(id.second) >> IO(println(id))
} yield res

val stream =  str.parEvalMapUnordered(5)(identity)

stream.compile.drain.unsafeRunSync()

或者

 val stream = Stream.emits(List(1, 5, 7))
   .map { id => 
     Stream.eval(timer.sleep(id.second) >> IO(println(id))) }
   .parJoinUnbounded

stream.compile.drain.unsafeRunSync()
于 2020-07-14T17:49:13.970 回答