6

我正在构建一个将由第 3 方使用的库。在我的一种方法中,我返回Stream[Item]的是从分页 REST API 调用的结果异步生成的。

我正在使用我对BulkPullerAsync的修改。我的代码在这里

我希望我的流的接收者能够处理错误。根据文档,我应该使用 custom Supervision.Decider

val decider: Supervision.Decider = {
  case ex =>
    ex.printStackTrace()
    Supervision.Stop
}

implicit val mat = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(decider))

不幸的是,它没有捕捉到我的 ActionPublisher 中抛出的异常。我看到它已处理,ActorPublisher.onError已调用但未达到Supervision.Decider. 它适用于文档中提供的简单 Stream。

如果我使用,错误也不会到达演员Sink.actorRef

我应该怎么办 ?我希望我的用户Stream不应该依赖于其实施的性质。

UPD:为了实验,我尝试了以下示例

val stream = Source(0 to 5).map(100 / _)
stream.runWith(Sink.actorSubscriber(props))

在这种情况下,异常被Decider.

UPD2:我试图通过Publisher从不同类型的来源生成然后将它们转换回Source预期所有错误都会Subscriber.onError发生来欺骗它:)所以显然混合ActorPublisher + Decider有问题......

总的来说,我认为这是不一致的行为。我不能使用一种机制来处理Stream.

4

0 回答 0