3

我遇到了 FS2 和异常处理的问题。我想要的是,给定一个Stream[IO,A],当我使用一个f: A => B可以抛出异常的映射它时,我得到一个Stream[IO,Either[Throwable,B]].

我尝试了以下方法,它按预期工作:

import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
  .map(x => x * x)
  .map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
  .attempt
x1.compile.toVector.unsafeRunSync().foreach(println)

它打印:

Right(1)
Right(4)
Left(java.lang.RuntimeException: I don't like 9s)

但是,当我尝试对此做任何事情时,我的问题就开始了Stream

val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
  .map(x => x * x)
  .map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
  .attempt.map(identity)

x1.compile.toVector.unsafeRunSync().foreach(println)

出现异常并终止应用程序:

java.lang.RuntimeException: I don't like 9s
    at swaps.fm.A$A32$A$A32.$anonfun$x1$2(tmp2.sc:7)
    at scala.runtime.java8.JFunction1$mcII$sp.apply(tmp2.sc:8)
    ...

更奇怪take的是,过去Stream只返回我知道没问题的元素,仍然以同样的方式爆炸:

val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
  .map(x => x * x)
  .map{ i => if(i == 9) throw new RuntimeException("I don't like 9s") else i}
  .attempt.take(2)

x1.compile.toVector.unsafeRunSync().foreach(println)

任何人都可以澄清为什么会这样吗?这是错误还是(未)预期的行为?

注意此行为存在于 FS2 0.10.0-M70.10.0

4

2 回答 2

2

这里的问题是要使用fs2你必须编写纯代码。抛出异常并不纯粹,因此如果您希望管道中的某个步骤可能失败,则需要明确说明。这里有两种方法:

import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
  .map(x => x * x)
  .map{ i => if(i == 9) Left[Throwable, Int](new RuntimeException("I don't like 9s")) else Right(i)}
x1.compile.toVector.unsafeRunSync().foreach(println)
// Explicit Left annotation is so you can .rethrow if desired; it can be omitted or added later with .widen

或者

import cats.effect.IO
val x1 = fs2.Stream.emits(Vector(1,2,3,4)).covary[IO]
  .map(x => x * x)
  .flatMap { i => if(i == 9) Stream.raiseError(new RuntimeException("I don't like 9s")) else Stream.emit(i) }
  .attempt
x1.compile.toVector.unsafeRunSync().foreach(println)

其中第一个更可取,因为这flatMapemit导致通常效率较低的 size-1 块。如果要在出现第一个错误时停止处理,请.rethrow在流的末尾添加 a。

于 2018-06-19T18:12:28.420 回答
1

看来问题出在这里:

self
 .stage(depth.increment, defer, o => emit(f(o)), os => {
   var i = 0; while (i < os.size) { emit(f(os(i))); i += 1; }

这是里面的代码Segment.map。当您使用以下方法分配向量时:

Stream.emits(Vector(1,2,3,4))

fs2 将分配一个段。看上面的代码mapos.size代表segment的大小,意思是,map总是会映射整个segment的大小。这意味着即使您问过您take(2),我们仍在有效地映射整个细分市场。

我们可以通过稍微修改一下代码来证明这一点:

def main(args: Array[String]): Unit = {
  val x1 = fs2.Stream
    .emits(Vector(1, 2, 3, 4))
    .segmentLimit(1)
    .covary[IO]
    .map { seg =>
      if (seg.sum.force.run > 3) throw new RuntimeException("I don't like 9s")
      else seg
    }
    .attempt
    .take(2)

println(x1.compile.toVector.unsafeRunSync())

这里的重要部分是segmentLimit,它使流将内部流动的数据分块为大小为 1 的段。当我们运行这段代码时,我们得到:

Vector(Right(Chunk(1)), Right(Chunk(2)))

这是一个错误吗?没有把握。我会咨询 Gitter 频道的维护人员。

于 2018-02-13T12:30:46.077 回答