我正在构建一个将由第三方使用的库。在我的方法之一中,我返回Stream[Item]
它是根据分页 REST API 调用的结果异步生成的。
我正在使用我的修改BulkPuller异步。我的代码是here.
我希望我的流的接收者能够处理错误。根据文档我应该使用自定义Supervision.Decider
.
val decider: Supervision.Decider = {
case ex =>
ex.printStackTrace()
Supervision.Stop
}
implicit val mat = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(decider))
不幸的是,它无法捕获我的 ActionPublisher 中引发的异常。我看到它处理了,ActorPublisher.onError
被调用但未到达Supervision.Decider
。它与文档中提供的简单 Stream 一起使用。
如果我使用,错误也不会到达演员Sink.actorRef
.
我应该怎么办 ?我希望我的用户Stream
不应取决于其实施的性质。
UPD:为了实验的目的,我尝试了以下示例
val stream = Source(0 to 5).map(100 / _)
stream.runWith(Sink.actorSubscriber(props))
在这种情况下,异常被捕获Decider
.
UPD2:我试图通过生产来欺骗它Publisher
来自不同类型的来源,然后将它们转换回Source
期待所有错误发生Subscriber.onError
这确实发生了:)所以显然混合 ActorPublisher+Decider 出了问题......
总的来说,我认为这是不一致的行为。我无法使用一种机制来处理错误Stream
.