我有长时间运行的进程,比如说 2 小时到 1 天。每个进程都以更新消息开始其生命周期,然后继续侦听进一步的并发更新。在更新消息中有一个唯一的目标标识符。
如果我想用一个 Actor 来表示每个进程,我该如何初始化这个 Actor?我显然需要根据更新消息中标识符的值进行原子查找/创建操作?如何用 scala/akka 设计这个?
设置一个执行这些流程参与者管理的参与者(比如一个参与者ProcessManager
)。该参与者将支持获取特定流程 ID 的流程参与者的请求。在内部,它将查看该孩子是否已经存在。如果存在,它将sender
使用该 ref 响应。如果没有,它将创建它,然后sender
使用该 ref 响应。因为这个管理器actor连续处理它的邮箱(就像所有actor一样),所以你不必担心查找/创建的竞争条件。一个非常简化的例子:
case class GetProcessHandler(processId:Int)
class ProcessManager extends Actor{
def receive = {
case GetProcessHandler(id) =>
val name = s"proc_$id"
val handler = context.child(name).getOrElse(
context.actorOf(Props[ProcessHandler], name)
)
sender ! handler
}
}
class ProcessHandler extends Actor{
def receive = {
...
}
}
您可以在application.conf
. 然后您的main
程序可以使用您的 ActorSystem 创建/初始化这些演员。