我有一个无限fs2.Stream
可能会遇到错误。我想跳过那些什么都不做(可能是日志)的错误并继续流式传输更多元素。例子:
//An example
val stream = fs2.Stream
.awakeEvery[IO](1.second)
.evalMap(_ => IO.raiseError(new RuntimeException))
在这种特定情况下,我希望每秒发射无限fs2.Stream
。Left(new RuntimeException)
有一种Stream.attempt
方法可以生成在遇到第一个错误后终止的流。有没有办法跳过错误并继续拉更多元素?
通常IO.raiseError(new RuntimeException).attempt
不会起作用,因为它需要在流管道组合的所有位置尝试所有效果。