我遇到了 FS2 和异常处理的问题。我想要的是,给定一个Stream[IO,A]
,当我使用一个f: A => B
可以抛出异常的映射它时,我得到一个Stream[IO,Either[Throwable,B]]
.
我尝试了以下方法,它按预期工作:
import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
.attempt
x1.compile.toVector.unsafeRunSync().foreach(println)
它打印:
Right(1)
Right(4)
Left(java.lang.RuntimeException: I don't like 9s)
但是,当我尝试对此做任何事情时,我的问题就开始了Stream
。
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
.attempt.map(identity)
x1.compile.toVector.unsafeRunSync().foreach(println)
出现异常并终止应用程序:
java.lang.RuntimeException: I don't like 9s
at swaps.fm.A$A32$A$A32.$anonfun$x1$2(tmp2.sc:7)
at scala.runtime.java8.JFunction1$mcII$sp.apply(tmp2.sc:8)
...
更奇怪take
的是,过去Stream
只返回我知道没问题的元素,仍然以同样的方式爆炸:
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
.map(x => x * x)
.map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
.attempt.take(2)
x1.compile.toVector.unsafeRunSync().foreach(println)
任何人都可以澄清为什么会这样吗?这是错误还是(未)预期的行为?
注意此行为存在于 FS2 0.10.0-M7和0.10.0