0

我一直在尝试找出一种方法来控制具有某种外部(与演员系统分开)控制器的演员系统内传递的运行时消息。换句话说,给定一个参与者系统(我不想更改):我如何设置一种控制器来控制其中的消息传递?

例如,假设给定的 Actor 系统具有以下设置:

object Program extends App {
  val system = ActorSystem("system")
  val B = system.actorOf(Props[B], "B")
  val A = system.actorOf(Props(new A(B)), "A")
  A ! "Start"
  system.terminate()
}
class A(B: ActorRef) extends Actor {
  def receive  = {case "Start" => B ! "Message"}
}
class B extends Actor {
  def receive = {case "Message" => println("Some logic")}
}

我想完成以下任务:

  • 在单个线程上同步运行此程序
  • 对于系统内传递的每条消息:检查内容、发送者和接收者,并以此为基础;执行一些逻辑。

在上面的示例中,我想要一个“控制器”,它可以执行以下操作:

  1. 演员 A 从外部收到消息“开始”,并向演员 B 发送“消息”
  2. 在控制器上执行一些阻塞逻辑,即actor系统将空闲地等待这个逻辑被执行。
  3. 现在已经执行了逻辑,控制器向参与者系统发送绿灯以恢复消息传递。
  4. 演员 B 收到“消息”并打印“一些逻辑”
  5. 控制器检查参与者系统是否终止,它是,并执行一些额外的逻辑。

简而言之,我希望外部控制器能够在运行时控制 Actor 系统内的消息传递。

我在想这个控制器可能会使用调度程序、路由器参与者逻辑和期货来实现。我在 Akka 文档中没有找到任何关于此的示例,那么这甚至可以实现吗?

4

0 回答 0