我正在开发具有两个模块的客户端-服务器应用程序:1)。注册表:网络中的所有服务器都向注册表注册它们正在侦听的 IP 和端口。2)。消息节点:这些也是服务器,一旦收到来自注册表的命令,它们就会在一轮消息中相互通信。消息传递节点一次只会向 1 个节点发送消息,但可能会从多个节点接收消息。
注册中心向所有其他服务器发送可用(已注册)节点的列表,并且在收到此列表后,节点开始相互交换消息。
我能够让注册表部分正常工作,并且节点能够自行注册。此外,节点成功地从注册表接收节点列表并能够开始交换消息。然而,消息传递节点不能接收所有正在发送的消息。
这是消息传递节点部分:
public class MessagingNode {
// Constructor
public MessagingNode(String registryHost, int registryPort) {
try {
registryHostname = InetAddress.getByName(registryHost).getHostAddress();
} catch (Exception e) {
System.out.println("Can't resolve Registry hostname!");
}
registryPortNumber = registryPort;
// Start the node server
try {
nodeServerSocket = new ServerSocket(0);
}
catch (Exception e) {
System.out.println("Can't start node server!");
}
// Store the MessagingNode server port and IP address
try {
nodeIpAddress = InetAddress.getLocalHost().getHostAddress();
} catch(Exception e) {
System.out.println("Can't get localhost");
}
nodePortNumber = nodeServerSocket.getLocalPort();
//System.out.println("Node Port:"+nodePortNumber);
// Initialize trackers
sendTracker = 0;
receiveTracker = 0;
sendSummation = 0;
receiveSummation = 0;
sending = false;
}
// Start the NodeServer to listen for other nodes
public void startNodeServer() {
if(!isRunning) {
isRunning = true;
consoleListener();
while(MessagingNode.this.isRunning) {
Socket clientSocket = null;
try {
clientSocket = nodeServerSocket.accept();
openClient(clientSocket);
}
catch(IOException e) {
e.printStackTrace();
}
}
}
}
// Messaging nodes receiving
public void openClient(final Socket socket) {
Thread clientThread = new Thread() {
public void run() {
int count = 0;
try {
byte[] messagePayload = new byte[64];
InputStream in = socket.getInputStream();
DataInputStream din = new DataInputStream(in);
//BufferedReader br = new BufferedReader(new InputStreamReader(in));
while(din.read(messagePayload) > -1) {
//count++;
int type = InterpretMessage.getMessageType(messagePayload);
if(type == MessageTypes.MESSAGING_NODES_LIST) {
trimNodeList(messagePayload);
System.out.println("Node list Received!");
dataThread();
}
else {
handleIncomingPayload(messagePayload);
}
}
//System.out.println("Input connection closed after :" + count);
//System.out.println("Listener Closed!");
//in.close();
//socket.close();
}
catch(Exception e) {
e.printStackTrace();
}
}
};
clientThread.start();
try {
clientThread.join();
}
catch(Exception e) {
e.printStackTrace();
}
}
public void dataThread() {
Thread t = new Thread() {
public void run() {
sendData();
}
};
t.start();
try {
t.join();
}
catch(Exception e) {
e.printStackTrace();
}
}
public void sendData() {
// pick a node
int size = messagingNodesList.size();
int index = (int)(Math.random() * size);
//System.out.println("Node being contacted:" + messagingNodesList.get(index));
String details = messagingNodesList.get(index);
String[] str = details.split(" ");
try {
Socket socket = new Socket(str[0], Integer.parseInt(str[1]));
DataOutputStream out = new DataOutputStream(socket.getOutputStream());
for(int i= 0; i < 5; i++) {
CommMessage outgoing = new CommMessage();
byte[] msg = outgoing.marshall();
out.write(msg, 0, msg.length);
System.out.println("Sending number:" + outgoing.number);
out.flush();
}
out.close();
//closeConnection = true;
}
catch(IOException e) {
e.printStackTrace();
}
}
// Remove Node's information
public void trimNodeList(byte[] incoming) {
// Removes the current node's information from the list recieved from registry
}
// Update trackers
public void handleIncomingPayload(byte[] payload) {
CommMessage msg = new CommMessage(payload);
msg.unmarshall();
System.out.println("Receiving Number:"+msg.number);
//synchronized(this) {
receiveSummation += msg.number;
receiveTracker += 1;
//}
}
// Console Listener
public void consoleListener() {
Thread listener = new Thread() {
public void run() {
// listen for keyboard instructions
}
};
listener.start();
}
public static void main(String[] args) {
MessagingNode node = new MessagingNode(args[0], Integer.parseInt(args[1]));
node.connectRegistry();
node.startNodeServer();
//node.deregisterNode();
//node.deregisterNode();
//node.sendBurst();
// Launch the console Listener Thread
// Initiate rounds of messaging
}
}
问题出在我假设的接收部分的某个地方。我尝试在 sendData 方法上使用同步,因为有人指出这可能是一个并发问题,但它没有帮助。我仍然无法接收所有消息。任何见解都会非常有帮助。
谢谢。