1

我有两个演员:

ProcessManager处理系统中的一些流程(例如,用户注册、购买等)

通知程序- 如果 ProcessManager 发生错误,应通知用户。我需要捕获 ProcessManager 参与者的失败(它失败并因任何原因停止,例如,由于 ActorInitializationException 或达到最大重启时间并且流程管理器参与者已停止)。

   class ProcessManager extends Actor {
      override def receive: Receive = {
        ...
      }
    }

    class Notifier extends Actor {
      override def receive: Receive = {
        PROCESS MANAGER ACTOR FAILED AND STOPPED =>
          // Here I need to catch failure of ProcessManager actor
          // (it was failed and stopped for what ever
          // reason, for example, because of ActorInitializationException
          // or max restart time reached and Process manager actor was stopped).
          //
          // Then do some stuff, for example, send message to the client via web socket.
      }
    }


    class MyController @Inject() (cc: ControllerComponents, actorSystem: ActorSystem)
      (implicit exec: ExecutionContext) extends AbstractController(cc)  {


      // I need to catch failure of processManager in this actor.
      val notifier = actorSystem.actorOf(Props(classOf[Notifier]))

      def registerUser = Action.async {         

          // Actor may be stopped because of ActorInitializationException here
          val processManager = actorSystem.actorOf(Props(classOf[ProcessManager]))
              ...

         // OR it may be stopped here for any reason.
         processManager ! "some message which will fail and stop pm actor"

         Future.successfull(Ok("Thanks."))   
       }
   }

如何在Notifier角色中捕获ProcessManager角色的终止(由于失败) ?

编辑 让我解释一下我的问题的背景。

我在 Play 控制器中创建 PM actor 并将消息发送给它(Tell),然后我立即向用户返回 Ok 响应。PM actor 创建另一个子actor,并在创建过程中抛出 ActorInitializationException。我需要通知用户(通过 Web 套接字,使用 Notifier Actor)出现问题。

4

1 回答 1

3

当actor永久停止时,您可以使用DeathWatch注册Notifieractor来接收Terminated消息。将需要对参与者的引用,并且一种方法是将引用作为消息发送(这是安全的,因为它是不可变的和可序列化的)。ProcessManagerNotifierProcessManagerDeathWatchProcessManagerActorRef

class Notifier extends Actor {
  var processManager: Option[ActorRef] = None

  def receive: Receive = {
    case aRef: ActorRef =>
      if (processManager.isEmpty) {
        processManager = Some(aRef)
        context.watch(aRef) // register to "watch" the process manager
      }
    case Terminated =>
      // process manager was permanently stopped
    case ...
  }
}

object Demo extends App {
  val actorSystem = ActorSystem("my-actor-system")

  val notifier = actorSystem.actorOf(Props(classOf[Notifier]))
  val processManager = actorSystem.actorOf(Props(classOf[ProcessManager]))

  notifier ! processManager // send processManager's ref to the notifier
  ...
  processManager ! "some message which will fail and stop pm actor"
  ...
}

一个警告:在DeathWatch尝试ActorInitializationException创建ProcessManager.


如果您需要Notifier在子级ProcessManager抛出异常时发送消息,则覆盖其中的主管策略ProcessManager并将此消息作为策略的一部分发送。就像是:

class ProcessManager extends Actor {
  import akka.actor.OneForOneStrategy
  import akka.actor.SupervisorStrategy._
  import scala.concurrent.duration._

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
      case _: ActorInitializationException =>
        val notifier = context.actorSelection("/path/to/notifier")
        notifier ! CustomErrorMessage
        Stop
      case _: Exception => Escalate
   }

   def receive: Receive = {
     ...
   }
}
于 2017-07-21T00:45:58.917 回答