为了避免 OOM,我使用共享的自定义调度程序来限制我的一些 Akka 1.1.3 演员的邮箱大小。例如:
object Static {
val dispatcher = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher(
"customDispatcher",
1000,
BoundedMailbox(capacity = 10)
)
}
class MyActor extends Actor {
self.dispatcher = Static.dispatcher
...
}
我想对邮箱溢出做出反应,以便我可以向上游生产者发送消息以暂停(旁注:可悲的是,它看起来像actor.stop()
,等待,并且actor.start()
会抛出一个ActorStartException
)。在队列填满和队列稍微耗尽之间,一些数据丢失是可以接受的。
Akka关于调度程序的章节说
当尝试向 Actor 发送消息时,如果在 pushTimeout 指定的时间内无法将消息添加到邮箱,它将抛出 MessageQueueAppendFailedException(“BlockingMessageTransferQueue transfer timed out”)。
我在哪里可以捕捉到这个异常?
该文档听起来像是我需要将每个都包装myActor ! message
在 try/catch 中。那正确吗?我真的很想集中处理它。我Supervisor
可以拦截它并运行我的处理程序吗?