2

我正在开发具有两个模块的客户端-服务器应用程序:1)。注册表:网络中的所有服务器都向注册表注册它们正在侦听的 IP 和端口。2)。消息节点:这些也是服务器,一旦收到来自注册表的命令,它们就会在一轮消息中相互通信。消息传递节点一次只会向 1 个节点发送消息,但可能会从多个节点接收消息。

注册中心向所有其他服务器发送可用(已注册)节点的列表,并且在收到此列表后,节点开始相互交换消息。

我能够让注册表部分正常工作,并且节点能够自行注册。此外,节点成功地从注册表接收节点列表并能够开始交换消息。然而,消息传递节点不能接收所有正在发送的消息。

这是消息传递节点部分:

public class MessagingNode  {

    // Constructor
    public MessagingNode(String registryHost, int registryPort)  {
        try  {
            registryHostname = InetAddress.getByName(registryHost).getHostAddress();
        } catch (Exception e)  {
            System.out.println("Can't resolve Registry hostname!");
        }
        registryPortNumber = registryPort;

        // Start the node server
        try  {
            nodeServerSocket = new ServerSocket(0);
        }
        catch (Exception e)  {
            System.out.println("Can't start node server!");
        }

        // Store the MessagingNode server port and IP address
        try  {
            nodeIpAddress = InetAddress.getLocalHost().getHostAddress();
        } catch(Exception e)  {
            System.out.println("Can't get localhost");
        }
        nodePortNumber = nodeServerSocket.getLocalPort();
        //System.out.println("Node Port:"+nodePortNumber);

        // Initialize trackers
        sendTracker = 0;
        receiveTracker = 0;
        sendSummation = 0;
        receiveSummation = 0;
        sending = false;
    }



    // Start the NodeServer to listen for other nodes
    public void startNodeServer()  {
        if(!isRunning)  {
            isRunning = true;
            consoleListener();
            while(MessagingNode.this.isRunning)  {
                Socket clientSocket = null;
                try  {
                    clientSocket = nodeServerSocket.accept();
                    openClient(clientSocket);                       

                } 
                catch(IOException e)  {
                    e.printStackTrace();
                }

            }
        }
    }


    // Messaging nodes receiving 
    public void openClient(final Socket socket)  {
        Thread clientThread = new Thread()  {
            public void run()  {
                int count = 0;
                try  { 
                    byte[] messagePayload = new byte[64];

                    InputStream in = socket.getInputStream();
                    DataInputStream din = new DataInputStream(in);
                    //BufferedReader br =  new BufferedReader(new InputStreamReader(in));   
                    while(din.read(messagePayload) > -1)  {

                        //count++;

                        int type = InterpretMessage.getMessageType(messagePayload);
                        if(type == MessageTypes.MESSAGING_NODES_LIST)  {
                            trimNodeList(messagePayload);
                            System.out.println("Node list Received!");
                            dataThread();

                        } 
                        else  {         
                            handleIncomingPayload(messagePayload);
                        }
                    }
                    //System.out.println("Input connection closed after :" + count);
                //System.out.println("Listener Closed!");
                //in.close();
                //socket.close();

                }
                catch(Exception e)  {
                    e.printStackTrace();
                }


            }
        };
        clientThread.start();
        try  {
                    clientThread.join();
                }
                catch(Exception e)  {
                    e.printStackTrace();
                }


    }

    public void dataThread()    {
        Thread t = new Thread()  {
            public void run()  {
                sendData();
            }
        };
        t.start();
        try  {
            t.join();
        }
        catch(Exception e)  {
            e.printStackTrace();
        }
    }

    public void sendData()  {
        // pick a node
        int size = messagingNodesList.size();
        int index = (int)(Math.random() * size);
        //System.out.println("Node being contacted:" + messagingNodesList.get(index));
        String details = messagingNodesList.get(index);
        String[] str = details.split(" ");


        try  {
            Socket socket = new Socket(str[0], Integer.parseInt(str[1]));
            DataOutputStream out = new DataOutputStream(socket.getOutputStream());
            for(int i= 0; i < 5; i++)  {
                CommMessage outgoing = new CommMessage();
                byte[] msg = outgoing.marshall();
                out.write(msg, 0, msg.length);
                System.out.println("Sending number:" + outgoing.number);
                out.flush();
            }
            out.close();
            //closeConnection = true;
        }
        catch(IOException e)  {
            e.printStackTrace();
        }
    }

    // Remove Node's information
    public void trimNodeList(byte[] incoming)  {
    // Removes the current node's information from the list recieved from registry
    }

    // Update trackers
    public void handleIncomingPayload(byte[] payload)  {
        CommMessage msg = new CommMessage(payload);
        msg.unmarshall();
        System.out.println("Receiving Number:"+msg.number);
        //synchronized(this)  {
            receiveSummation += msg.number;
            receiveTracker += 1;
        //}
    }


    // Console Listener
    public void consoleListener()  {
        Thread listener = new Thread()  {
            public void run()  {
                // listen for keyboard instructions
            }
        };
        listener.start();
    }



    public static void main(String[] args)  {
        MessagingNode node = new MessagingNode(args[0], Integer.parseInt(args[1]));
        node.connectRegistry();
        node.startNodeServer();
        //node.deregisterNode();
        //node.deregisterNode();
        //node.sendBurst();
        // Launch the console Listener Thread
        // Initiate rounds of messaging

    }
}

问题出在我假设的接收部分的某个地方。我尝试在 sendData 方法上使用同步,因为有人指出这可能是一个并发问题,但它没有帮助。我仍然无法接收所有消息。任何见解都会非常有帮助。

谢谢。

4

0 回答 0