我正在关注来自 akka.io 容错的代码http://doc.akka.io/docs/akka/current/java/fault-tolerance.html
.我已将此代码作为参考。我的要求如下:假设演员在一条消息上崩溃并由他的主管重新启动。然后他开始处理他邮箱中的下一条消息。导致崩溃的消息被“丢弃”。但我想在特定时间(假设 3 次)内处理相同的操作,并且它们之间有定义的间隔(假设 1 秒)。如何使用 akka 监督来执行此操作。实际上通过演员我正在尝试检查特定服务api是否正在工作(即给出一些异常)。因此,如果特定尝试有任何异常(假设未找到404),请将消息重新发送给失败的工作人员,直到maxNrOfRetries 由 supervisorStrategy 指定。如果工人失败了“maxNrOfRetries”次,那么就记录下来“
我的主管班:
public class Supervisor extends UntypedActor {
private static SupervisorStrategy strategy =
new OneForOneStrategy(3, Duration.create("1 minute"),
new Function<Throwable, Directive>() {
@Override
public Directive apply(Throwable t) {
if (t instanceof Exception) {
return restart();
}else if (t instanceof IllegalArgumentException) {
return stop();
} else {
return escalate();
}
}
});
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
public void onReceive(Object o) {
if (o instanceof Props) {
getSender().tell(getContext().actorOf((Props) o), getSelf());
} else {
unhandled(o);
}
}
}
儿童班:
public class Child extends UntypedActor {
public void onReceive(Object o) throws Exception {
if (o instanceof String) {
Object response = someFunction( (String) message);//this function returns either successfull messgae as string or exception
if(response instanceOf Exception) {
throw (Exception) response;
}
else
getSender().tell(response, getSelf())
}else {
unhandled(o);
}
}
}
创建演员:
Props superprops = Props.create(Supervisor.class);
ActorRef supervisor = system.actorOf(superprops, "supervisor");
ActorRef child = (ActorRef) Await.result(ask(supervisor,
Props.create(Child.class), 5000), timeout);
child.tell("serVice_url", ActorRef.noSender());
对于 service_url,如果发生故障,我想重复该过程。但它没有发生。如果在 creatng actor 中写入下一行,child.tell("serVice_url_2", ActorRef.noSender());
则此行将被排除,但我想在特定时间(假设 3 次)内处理相同的操作(发生故障),并在它们之间定义间隔。请指导我实现这一目标。