3

阿卡新手。创建扩展的新 Scala 类SupervisorStrategy为我提供了以下模板:

class MySupervisorStrategy extends SupervisorStrategy {
  override def decider: Decider = ???

  override def handleChildTerminated(context: ActorContext, child: ActorRef,
    children: Iterable[ActorRef]): Unit = ???

  override def processFailure(context: ActorContext, restart: Boolean,
    child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = ???
}

我正在寻找一种访问方式:

  1. 从儿童演员那里抛出的Throwable/Exception
  2. ActorRef抛出异常的子actor
  3. 传递给子actor的消息,提示抛出异常

认为Decider实际上是 a PartialFunction[Throwable,Directive]Throwable只要孩子抛出异常就会通过,但我没有看到我可以从上面的列表中访问 #2 和 #3 的位置。有任何想法吗?


更新

从发布的小提琴看来,有效Decider的是:

{
    case ActorException(ref,t,"stop")      =>
      println(s"Received 'stop' from ${ref}")
      Stop
    case ActorException(ref,t,"restart")      =>
      println(s"Received 'restart' from ${ref}")
      Restart
    case ActorException(ref,t,"resume")      =>
      println(s"Received 'resume' from ${ref}")
      Resume
}

在上面,我看到了所有三个:

  1. 孩子抛出的异常
  2. ref引发异常的子 ( )
  3. 最初发送给孩子的消息(导致抛出异常)

看起来没有什么需要在该类Decider中定义。我想把逻辑拉出来,说,并找到一种方法来重构它,以便它使用 的实例,所以可能类似于:SupervisorDeciderMyDecider.scalaSupervisorsupervisorStrategyMyDecider

class Supervisor extends Actor {
  import akka.actor.OneForOneStrategy
  import akka.actor.SupervisorStrategy._
  import scala.concurrent.duration._

  var child: ActorRef = _

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute, decider = myDecider)

  ...
}
4

1 回答 1

3

对于#2,您可以访问发件人“如果在监督演员内部声明了策略

如果策略是在监督参与者内部(而不是在伴随对象内)声明的,它的决策者可以以线程安全的方式访问参与者的所有内部状态,包括获取对当前失败的子节点的引用(可作为发送者使用)的失败消息)

该消息不可用,因此唯一的选择是捕获您的异常并使用收到的消息抛出一个自定义异常。

这是一个快速的小提琴

class ActorSO extends Actor {

  def _receive: Receive = {
    case e =>
      println(e)
      throw new RuntimeException(e.toString)
  }

  final def receive = {
    case any => try {
      _receive(any)
    }
    catch {
      case t:Throwable => throw new ActorException(self,t,any)
    }

  }

}

更新

ADecider只是 aPartialFunction所以你可以在构造函数中传递它。

object SupervisorActor {
  def props(decider: Decider) = Props(new SupervisorActor(decider))
}

class SupervisorActor(decider: Decider) extends Actor {

  override val supervisorStrategy = OneForOneStrategy()(decider)

  override def receive: Receive = ???
}

class MyDecider extends Decider {
  override def isDefinedAt(x: Throwable): Boolean = true

  override def apply(v1: Throwable): SupervisorStrategy.Directive = {
    case t:ActorException => Restart
    case notmatched => SupervisorStrategy.defaultDecider.apply(notmatched)
  }
}

object Test {
  val myDecider: Decider = {
    case t:ActorException => Restart
    case notmatched => SupervisorStrategy.defaultDecider.apply(notmatched)
  }
  val myDecider2 = new MyDecider()
  val system = ActorSystem("stackoverflow")
  val supervisor = system.actorOf(SupervisorActor.props(myDecider))
  val supervisor2 = system.actorOf(SupervisorActor.props(myDecider2))
}

通过这样做,您将无法访问ActorRef引发异常的孩子的主管状态sender()(尽管我们将其包含在 中ActorException

关于从主管访问导致异常的子消息的原始问题,您可以在此处(来自 akka 2.5.3)看到 akka 开发人员选择不使其可用于决策。

  final protected def handleFailure(f: Failed): Unit = {
    // ¡¡¡ currentMessage.message is the one that cause the exception !!!
    currentMessage = Envelope(f, f.child, system)
    getChildByRef(f.child) match {
      /*
       * only act upon the failure, if it comes from a currently known child;
       * the UID protects against reception of a Failed from a child which was
       * killed in preRestart and re-created in postRestart
       */
      case Some(stats) if stats.uid == f.uid ⇒
        // ¡¡¡ currentMessage.message is not passed to the handleFailure !!!
        if (!actor.supervisorStrategy.handleFailure(this, f.child, f.cause, stats, getAllChildStats)) throw f.cause
      case Some(stats) ⇒
        publish(Debug(self.path.toString, clazz(actor),
          "dropping Failed(" + f.cause + ") from old child " + f.child + " (uid=" + stats.uid + " != " + f.uid + ")"))
      case None ⇒
        publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + f.cause + ") from unknown child " + f.child))
    }
  }
于 2018-05-17T17:09:17.183 回答