1

我正在关注来自 akka.io 容错的代码http://doc.akka.io/docs/akka/current/java/fault-tolerance.html.我已将此代码作为参考。我的要求如下:假设演员在一条消息上崩溃并由他的主管重新启动。然后他开始处理他邮箱中的下一条消息。导致崩溃的消息被“丢弃”。但我想在特定时间(假设 3 次)内处理相同的操作,并且它们之间有定义的间隔(假设 1 秒)。如何使用 akka 监督来执行此操作。实际上通过演员我正在尝试检查特定服务api是否正在工作(即给出一些异常)。因此,如果特定尝试有任何异常(假设未找到404),请将消息重新发送给失败的工作人员,直到maxNrOfRetries 由 supervisorStrategy 指定。如果工人失败了“maxNrOfRetries”次,那么就记录下来“

我的主管班:

public class Supervisor extends UntypedActor {


 private static SupervisorStrategy strategy =

 new OneForOneStrategy(3, Duration.create("1 minute"),
  new Function<Throwable, Directive>() {
    @Override
    public Directive apply(Throwable t) {
      if (t instanceof Exception) {
        return restart();
      }else if (t instanceof IllegalArgumentException) {
        return stop();
      } else {
        return escalate();
      }
    }
  });

 @Override
 public SupervisorStrategy supervisorStrategy() {
 return strategy;


}
public void onReceive(Object o) {
if (o instanceof Props) {
  getSender().tell(getContext().actorOf((Props) o), getSelf());
} else {
  unhandled(o);
}


 }
}

儿童班:

public class Child extends UntypedActor {


  public void onReceive(Object o) throws Exception {
if (o instanceof String) {
Object response = someFunction( (String) message);//this function returns either successfull messgae as string or exception
if(response instanceOf Exception) {
     throw (Exception) response;
   } 
   else
     getSender().tell(response, getSelf())
}else {
  unhandled(o);
}


}

}

创建演员:

Props superprops = Props.create(Supervisor.class);
ActorRef supervisor = system.actorOf(superprops, "supervisor");
ActorRef child = (ActorRef) Await.result(ask(supervisor,
Props.create(Child.class), 5000), timeout);
child.tell("serVice_url", ActorRef.noSender());

对于 service_url,如果发生故障,我想重复该过程。但它没有发生。如果在 creatng actor 中写入下一行,child.tell("serVice_url_2", ActorRef.noSender());则此行将被排除,但我想在特定时间(假设 3 次)内处理相同的操作(发生故障),并在它们之间定义间隔。请指导我实现这一目标。

4

1 回答 1

0

我想我已经开发了一种方法。尽管我仍然需要在生产级别上进行测试。我在下面写下答案,因为它可能对尝试实现相同目标的人有所帮助。如果有人找到更好的方法,那么他/她很受欢迎。在这里要提一下,通过这种方法,Supervisor 在一个时间范围内处理相同的操作(以及发生故障的消息)特定次数(假设 3 次)。我无法定义它们之间的间隔。 这是代码。主管班。

public class MyUntypedActor extends UntypedActor {
//here I have given Max no retrilas as 10.I will controll this number from logic as per my own requirements.But user given number of retrials can not exceed 10.
private static SupervisorStrategy strategy = new AllForOneStrategy(10, Duration.create(5, TimeUnit.MINUTES),
        new Function<Throwable, SupervisorStrategy.Directive>() {
            @Override
            public SupervisorStrategy.Directive apply(Throwable t) {
                if (t instanceof Exception) {
                    //System.out.println("exception" + "*****" + t.getMessage() + "***" + t.getLocalizedMessage());
                    return restart();
                } else if (t instanceof NullPointerException) {
                    return restart();
                } else if (t instanceof IllegalArgumentException) {
                    return stop();
                } else {
                    return escalate();
                }
            }
        });

@Override
public SupervisorStrategy supervisorStrategy() {
    return strategy;
}

public void onReceive(Object o) {
    if (o instanceof Props) {
        getSender().tell(getContext().actorOf((Props) o), getSelf());
    } else {
        unhandled(o);
    }
}
}

我们将在其中编写逻辑的子类。

public class Child extends UntypedActor {
//Through preRestart it will push the message for which exception occured before the restart of the child
@Override
public void preRestart(final Throwable reason, final scala.Option<Object> message) throws Exception {
    System.out.println("reStarting :::" + message.get());
    SetRules.setRemainingTrials(SetRules.remainingTrials + 1);
    getSelf().tell(message.get(), getSender());
};

public void onReceive(Object o) throws Exception {

    if (o instanceof Exception) {
        throw (Exception) o;
    } else if (o instanceof Integer) {
    } else if (o.equals("get")) {
        getSender().tell("get", getSelf());
    } else if (o instanceof String) {

        try {
            // here either we can write our logic directly or for a better
            // approach can call a function where the logic will be excuted.
            getSender().tell("{\"meggase\":\"Succesfull after " + SetRules.remainingTrials + " retrials\"}",
                    getSelf());
        } catch (Exception ex) {
            if (SetRules.remainingTrials == SetRules.noOfRetries) {
                getSender().tell("{\"meggase\":\"Failed to connect after " + SetRules.noOfRetries + " retrials\"}",
                        getSelf());
            } else {
                Exception value1 = ex;
                throw (Exception) value1;
            }
        }
    } else {
        unhandled(o);
    }
}
}

具有有关用户信息的 SetRules 类提供 noOfRetrials 并且还通过剩余试验存储有关每个重试状态的重试次数信息

public class SetRules {

public static int noOfRetries;
public static int remainingTrials;

public SetRules(int noOfRetries, int remainingTrials) {
    super();
    SetRules.noOfRetries = noOfRetries;
    SetRules.remainingTrials = remainingTrials;
}

public int getRemainingTrials() {
    return remainingTrials;
}

public static void setRemainingTrials(int remainingTrials) {
    SetRules.remainingTrials = remainingTrials;
}
}

现在让我们创建演员。

Props superprops = Props.create(MyUntypedActor.class);
SetRules setRules=new SetRules(3,0);
ActorSystem system = ActorSystem.create("helloakka");
ActorRef supervisor = system.actorOf(superprops, "supervisor");
ActorRef child = (ActorRef) Await.result(ask(supervisor, Props.create(Child.class), 5000), Duration.create(5, "minutes"));
Future<Object> future = Patterns.ask(child, service_Url, new Timeout(Duration.create(5, "minutes")));
Object result =  Await.result(future, Duration.create(5, "minutes"));
System.out.println(result);
于 2017-02-22T07:26:59.247 回答