4

我有一个无限fs2.Stream可能会遇到错误。我想跳过那些什么都不做(可能是日志)的错误并继续流式传输更多元素。例子:

//An example
val stream = fs2.Stream
    .awakeEvery[IO](1.second)
    .evalMap(_ => IO.raiseError(new RuntimeException))

在这种特定情况下,我希望每秒发射无限fs2.StreamLeft(new RuntimeException)

有一种Stream.attempt方法可以生成在遇到第一个错误后终止的流。有没有办法跳过错误并继续拉更多元素?

通常IO.raiseError(new RuntimeException).attempt不会起作用,因为它需要在流管道组合的所有位置尝试所有效果。

4

1 回答 1

6

没有办法以您描述的方式处理错误。当流遇到第一个错误时,它被终止。请检查这个gitter 问题

您可以通过两种方式处理它:

  1. 尝试效果(但您已经提到在您的情况下这是不可能的)。

  2. 终止后重新启动流:

val stream: Stream[IO, Either[Throwable, Unit]] = Stream
    .awakeEvery[IO](1.second)
    .evalMap(_ => IO.raiseError(new RuntimeException))
    .handleErrorWith(t => Stream(Left(t)) ++ stream) //put Left to the stream and restart it

//since stream will infinetelly restart I take only 3 first values
println(stream.take(3).compile.toList.unsafeRunSync()) 
于 2020-07-14T21:47:36.620 回答