1

我有一个源,它使用 aflatMapConcat来合并通过解析 Json 生成的新元素流。为此,我使用了 Alpakka 提供的 JsonReader ( https://developer.lightbend.com/docs/alpakka/current/data-transformations/json.html )。这很好用,但我想处理将无效 Json 呈现给读者的情况。在这种情况下,我希望异常冒泡到主管,以便它可以重新启动流。

这是一个展示错误 json 案例的快速片段:

val systemErrorHandler: Supervision.Decider = {
  case e: Exception =>
    println("System Error")
    Supervision.Restart
  case _ => Supervision.Stop
}

val routeErrorHandler: Supervision.Decider = { 
  case e: Exception =>
    println("Route Error")
    Supervision.Restart
}

val exceptionStrategy = ActorAttributes.supervisionStrategy(routeErrorHandler)

implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(systemErrorHandler))


val json = """{"test":"one}"""

val route = Source.single("1").flatMapConcat { _ =>
  Source.single(ByteString(json))
    .via(JsonReader.select("$")).map(_.utf8String)
}.to(Sink.foreach(println)).withAttributes(exceptionStrategy)

route.run

我希望 JsonReader 在抛出异常时失败,然后将异常冒泡到routeErrorHandler. 然而,当运行 JsonReader 阶段失败并且异常被吞下。我还尝试recover在流中放置一个以将异常抛出到“更高”级别(在子流之外),但这也不会被 routeErrorHandler 主管或放置在 ActorMaterializer 上的主管处理。

这是预期的行为吗?如果是这样,确保子流中的异常到达主管策略的正确方法是什么?

4

0 回答 0