我有一个非常简单的示例,其中我有一个 Actor ( SimpleActor),它通过向自身发送消息来执行周期性任务。消息在actor的构造函数中调度。在正常情况下(即没有故障)一切正常。

但是如果 Actor 必须处理故障怎么办。我还有另一个演员 ( SimpleActorWithFault)。这个演员可能有缺点。在这种情况下,我自己通过抛出异常来生成一个。当发生故障(即SimpleActorWithFault引发异常)时,它会自动重新启动。然而,这个重新启动会弄乱 Actor 内部的调度程序,它不再作为例外工作。如果故障发生得足够快,它会产生更多意想不到的行为。

我的问题是在这种情况下处理故障的首选方法是什么?我知道我可以使用Try块来处理异常。但是,如果我正在扩展另一个无法在超类中放置 Try 的演员,或者当我是演员的例外错误时,该怎么办。

import akka.actor.{Props, ActorLogging}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import akka.actor.Actor

case object MessageA

case object MessageToSelf

class SimpleActor extends Actor with ActorLogging {

  //schedule a message to self every second
  context.system.scheduler.schedule(0 seconds, 1 seconds, self, MessageToSelf)

  //keeps track of some internal state
  var count: Int = 0

  def receive: Receive = {
    case MessageA => {
      log.info("[SimpleActor] Got MessageA at %d".format(count))
    case MessageToSelf => {
      //update state and tell the world about its current state 
      count = count + 1
      log.info("[SimpleActor] Got scheduled message at %d".format(count))



class SimpleActorWithFault extends Actor with ActorLogging {

  //schedule a message to self every second
  context.system.scheduler.schedule(0 seconds, 1 seconds, self, MessageToSelf)

  var count: Int = 0

  def receive: Receive = {
    case MessageA => {
      log.info("[SimpleActorWithFault] Got MessageA at %d".format(count))
    case MessageToSelf => {
      count = count + 1
      log.info("[SimpleActorWithFault] Got scheduled message at %d".format(count))

      //at some point generate a fault
      if (count > 5) {
        log.info("[SimpleActorWithFault] Going to throw an exception now %d".format(count))
        throw new Exception("Excepttttttiooooooon")


object MainApp extends App {
  implicit val akkaSystem = akka.actor.ActorSystem()
  //Run the Actor without any faults or exceptions 

  //comment the above line and uncomment the following to run the actor with faults  


正确的方法是将冒险行为推到它自己的演员身上。这种模式称为错误内核模式(参见 Akka 并发,第 8.5 节):






 |- ActorWithFault

这里SimpleActor充当. _ _ 任何ActorWithFault演员的默认监督策略是重新启动孩子并升级其他任何事情: http ://doc.akka.io/docs/akka/snapshot/scala/fault-tolerance.htmlException


class SimpleActor {
  override def preStart(){
    // our faulty actor --- we will supervise it from now on
    context.actorOf(Props[ActorWithFault], "FaultyActor") 

  override val supervisorStrategy = OneForOneStrategy () {
    case _: ActorKilledException => Escalate
    case _: ActorInitializationException => Escalate
    case _ => Restart // keep restarting faulty actor

class SimpleActor extends Actor with ActorLogging {

  private var cancellable: Option[Cancellable] = None

  override def preStart() = {
    //schedule a message to self every second
    cancellable = Option(context.system.scheduler.schedule(0 seconds, 1 seconds, self, MessageToSelf))

  override def postStop() = {
    cancellable = None

正确处理异常(akka.actor.Status.Failure 用于在发件人使用 Ask 模式的情况下正确回答问题):

def receive: Receive = {
    case MessageA => {
      try {
        log.info("[SimpleActor] Got MessageA at %d".format(count))
      } catch {
        case e: Exception =>
          sender ! akka.actor.Status.Failure(e)
          log.error(e.getMessage, e)
