0

我有一个播放框架,它使用播放演员集成来引用远程 akka 系统并与之通信。看起来我没有得到正确的远程引用。首先,远程 akka 实现了可启动接口,并且它有一个主节点来创建一个子角色系统。
然后播放器系统引用远程系统。代码片段如下所示。

这是创建本地参与者系统的 Play 框架节点

 public void connectMaster (final String classname)
 {  
     localActor.tell(classname);

 }

 public  void connectMaster ()
 {  
     //localActor.tell(getByte(new Coordinates()));
 }


 public void connectMaster (final WebSocket.In<JsonNode> in, final WebSocket.Out<JsonNode> out )
 {        
         in.onMessage(new Callback<JsonNode>() {
     public void invoke(JsonNode event) throws JsonParseException, JsonMappingException, IOException {                 

    ObjectMapper mapper = new ObjectMapper();               

    @SuppressWarnings("unchecked")
        Map<String,ArrayList<Object>> jsonMap = mapper.readValue(event, Map.class); 
    GesturePoints gp = new GesturePoints();

    gp.setPoints(jsonMap);
    localActor.tell(gp);            
     }
  });     } 

这是play框架中的本地Actor系统

包控制器;

import Com.RubineEngine.GesturePoints.*;

import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;

public class LocalActor extends UntypedActor {

     /**
     * 
     */

    ActorRef masterActor; // = getContext().actorFor("akka://MasterNode@127.0.0.1:2552/user/masterActor");
     LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    @Override
    public void onReceive(Object arg) throws Exception {
        System.out.println(" Local Actor 1");
          if(arg instanceof GesturePoints)
         {  System.out.println(" local Actor 2");
              masterActor.tell(arg , getSelf());    
              System.out.println(" Local Actor 3");}    
         else 
         {unhandled(arg);}
    }


    public void preStart()
    {
      masterActor = getContext().actorFor("akka://MasterNode@127.0.0.1:2552/user/masterActor");
    }

}

这是创建主actor的远程akka系统主节点

package Rubine_Cluster;

import java.util.Arrays;

import com.typesafe.config.ConfigFactory;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.kernel.Bootable;

/**
 * Hello world!
 *
 */
public class MasterNode implements Bootable
{
     final ActorSystem system;
     ActorRef masterActor;

      public MasterNode() {

        //Create a child actor of this actor upon initialization 
        system = ActorSystem.create("MasterNode", ConfigFactory.load()
            .getConfig("masterNode"));
         masterActor = system.actorOf(new Props(MasterActor.class),"masterActor");  

      }


      public void startup() {

      }

          public void shutdown() {
            system.shutdown();
          }
}  

这是由 MasterNode 创建的远程参与者系统

public class MasterActor extends UntypedActor {

    /**
     * 
     */

    ActorSystem system = ActorSystem.create("container");

    ActorRef worker1;

    //public MasterActor(){}

    @Override
    public void onReceive(Object message) throws Exception {
        System.out.println(" Master Actor 5");

         if(message instanceof GesturePoints)
         {  //GesturePoints gp = (GesturePoints) message;
              System.out.println(" Master Actor 1");             
         try {      worker1.tell(message, getSelf());

             System.out.println(" Master Actor 2");
                } catch (Exception e) {
                  getSender().tell(new akka.actor.Status.Failure(e), getSelf());
                  throw e;
                }

    }
         else{ unhandled(message);}
  }  

    public void preStart()
    {
      worker1 = getContext().actorFor("akka://WorkerNode@127.0.0.1:2553/user/workerActor");
    }

}

我想我的引用有误,或者可能要发送消息,欢迎提出任何建议

4

0 回答 0