我有一个使用隐藏的演员。有时,当它崩溃时,它会丢失所有隐藏的消息。我发现这取决于我使用的监督逻辑。
我写了一个简单的例子。
藏匿的演员:
case object WrongMessage
case object TestMessage
case object InitialMessage
class TestActor extends Actor with Stash {
override def receive: Receive = uninitializedReceive
def uninitializedReceive: Receive = {
case TestMessage =>
println(s"stash test message")
stash()
case WrongMessage =>
println(s"wrong message")
throw new Throwable("wrong message")
case InitialMessage =>
println(s"initial message")
context.become(initializedReceive)
unstashAll()
}
def initializedReceive: Receive = {
case TestMessage =>
println(s"test message")
}
}
在以下代码中,TestActor
永远不会收到 stashed TestMessage
:
object Test1 extends App {
implicit val system: ActorSystem = ActorSystem()
val actorRef = system.actorOf(BackoffSupervisor.props(Backoff.onFailure(
Props[TestActor], "TestActor", 1 seconds, 1 seconds, 0
).withSupervisorStrategy(OneForOneStrategy()({
case _ => SupervisorStrategy.Restart
}))))
actorRef ! TestMessage
Thread.sleep(5000L)
actorRef ! WrongMessage
Thread.sleep(5000L)
actorRef ! InitialMessage
}
但是这段代码运行良好:
class SupervisionActor extends Actor {
val testActorRef: ActorRef = context.actorOf(Props[TestActor])
override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy()({
case _ => SupervisorStrategy.Restart
})
override def receive: Receive = {
case message => testActorRef forward message
}
}
object Test2 extends App {
implicit val system: ActorSystem = ActorSystem()
val actorRef = system.actorOf(Props(classOf[SupervisionActor]))
actorRef ! TestMessage
Thread.sleep(5000L)
actorRef ! WrongMessage
Thread.sleep(5000L)
actorRef ! InitialMessage
}
我查看了来源,发现演员监督使用
了由系统调度程序逻辑支持的LocalActorRef.restart方法,但只是在旧演员终止后创建了一个新演员。有没有办法解决它?BackoffSupervisor