我有一个使用小型 Akka 演员系统(使用 Java)的 Spring 应用程序,其中我有一个MasterActor
扩展 Akka 的 Akka AbstractActor
,它初始化 aRouter
并设置了一些工作演员。它还监视工人的生命周期。如果 Worker 演员因某些Exception
.
public MasterActor(ActorPropsFactory actorPropsFactory) {
this.actorPropsFactory = actorPropsFactory;
int workers = Runtime.getRuntime().availableProcessors() - 1;
List<Routee> routees = Stream.generate(this::createActorRefRoutee).limit(workers).collect(Collectors.toList());
this.router = new Router(new ConsistentHashingRoutingLogic(getContext().system()), routees);
}
private ActorRefRoutee createActorRefRoutee() {
ActorRef worker = getContext().actorOf(actorPropsFactory.create(getWorkerActorClass()));
getContext().watch(worker);
return new ActorRefRoutee(worker);
}
private void route(Object message, Supplier<String> routingKeySupplier) {
String routingKey = routingKeySupplier.get();
RouterEnvelope envelope = new ConsistentHashingRouter.ConsistentHashableEnvelope(message, routingKey);
router.route(envelope, getSender());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
EventMessage.class,
message -> this.route(message, () -> message.getEvent().getId().toString()))
.match(
Terminated.class,
message -> {
logger.info("WorkerActor {} terminated, restarting", message.getActor());
// todo: detect whether the system is shutting down before restarting the actor
router = router.removeRoutee(message.actor())
.addRoutee(createActorRefRoutee());
})
.build();
}
我遇到的问题是,如果 Spring 应用程序无法启动。(例如它无法连接到数据库,或者某些凭据不正确等),我正在接收Terminated
来自所有工作人员的消息,并且主演员尝试启动新的演员,这些演员也会Terminated
立即进入一个无限循环。
检测这种情况的正确方法是什么?有没有办法让主演员检测到演员系统正在关闭,以便工人不会再次重新启动?