我正在构建一个将由第 3 方使用的库。在我的一种方法中,我返回Stream[Item]
的是从分页 REST API 调用的结果异步生成的。
我正在使用我对BulkPullerAsync的修改。我的代码在这里。
我希望我的流的接收者能够处理错误。根据文档,我应该使用 custom Supervision.Decider
。
val decider: Supervision.Decider = {
case ex =>
ex.printStackTrace()
Supervision.Stop
}
implicit val mat = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(decider))
不幸的是,它没有捕捉到我的 ActionPublisher 中抛出的异常。我看到它已处理,ActorPublisher.onError
已调用但未达到Supervision.Decider
. 它适用于文档中提供的简单 Stream。
如果我使用,错误也不会到达演员Sink.actorRef
。
我应该怎么办 ?我希望我的用户Stream
不应该依赖于其实施的性质。
UPD:为了实验,我尝试了以下示例
val stream = Source(0 to 5).map(100 / _)
stream.runWith(Sink.actorSubscriber(props))
在这种情况下,异常被Decider
.
UPD2:我试图通过Publisher
从不同类型的来源生成然后将它们转换回Source
预期所有错误都会Subscriber.onError
发生来欺骗它:)所以显然混合ActorPublisher + Decider有问题......
总的来说,我认为这是不一致的行为。我不能使用一种机制来处理Stream
.