在我继承的应用程序中,有人告诉我必须将应用程序从单端口一连接应用程序转换为多连接应用程序。我对套接字编程仍然很陌生,并且可以真正使用一些帮助来解决这个问题。不幸的是,我的团队中没有其他人,我仍在尝试浏览此代码并学习该应用程序。
这是我的代码问题。我有一个在同一个类中调用 connect() 的 run(),但是在建立每个连接后我必须能够执行其余的 run() 代码,并验证 activeSocket 是否仍在 activeSockets 列表中。目前,connect() 代码设置为在继续运行 run() 之前获取 maxActiveSockets 列表中的所有连接,但这又是问题的一部分。在建立第一个连接后,我试图打破外循环,然后我没有看到第二个连接进入应用程序。我真的可以用另一双眼睛来看待这个问题。
我希望这是有道理的。
任何帮助/方向将不胜感激。谢谢你。
这是运行()代码:
public void run() {
int cnt;
Socket socket;
started = true;
if (!shouldTerminate) {
LOGGER.info("Connecting to host " + Thread.currentThread().getName());
connect();
hardstandin = false;
}
startTimers();
while (!shouldTerminate) {
// Make sure connection stays up unless shouldTerminate is set
if (!shouldTerminate) {
// ***** 20130912 MS - Per Shannon's email this method had to be changed to utilize the activeSocket fields. *****
Iterator<ActiveSocket> it = activeSockets.iterator();
while (it.hasNext()) {
ActiveSocket activeSocket = it.next();
socket = activeSocket.getSocket();
DataInputStream dataInputStream = activeSocket.getDataInputStream();
DataOutputStream dataOutputStream = activeSocket.getDataOutputStream();
// ORIGINAL CODE BLOCK STARTS HERE ==>
if (socket != null) {
synchronized (socket) {
try {
//LOGGER.info("Sending notifyAll on socket");
//socket.notifyAll();
if (!shouldTerminate) {
if (!getNeedReconnect().get()) {
hardstandin = false;
socket.wait();
}
}
} catch (final InterruptedException ex) {
LOGGER.info("Error waiting for new socket Interrupted Exception received.");
}
}
}
if (getNeedReconnect().get()) {
hardstandin = true;
final String channelName = "Bank " + bankInit + "; Bank ID " + getBankID();
final String logMessage = channelName.concat("\n\nData input stream on socket connection could not be read. Attempting to re-establish the connection.");
final String subject = channelName.concat(" Socket connection error");
ATMServer.sendNotification(subject, logMessage);
try {
socket.close();
} catch (final IOException ex) {
LOGGER.info(ERR_SOCKCLOSE);
}
socket = null;
try {
dataInputStream.close();
dataOutputStream.close();
} catch (final IOException e) {
LOGGER.warn("Could not close input/output streams: " + FormatData.formatStack(e));
}
dataInputStream = null;
dataOutputStream = null;
connect();
if (((dataInputStream != null) && (dataOutputStream != null)) || shouldTerminate) {
getNeedReconnect().set(false);
hardstandin = false;
synchronized (this) {
LOGGER.info("Sending notifyAll on channel");
this.notifyAll();
}
}
} // <== ORIGINAL CODE BLOCK ENDS HERE.
}
}
}
// *** shutdown the process
// need to shutdown the positiveBalance
// need to shut down each receive queuer
synchronized (this) {
LOGGER.info("Sending notifyAll on channel");
this.notifyAll();
}
// ***** 20130913 MS - Per Shannon's email, modify code to utilize the multiple connection code. *****
Iterator<ActiveSocket> it = activeSockets.iterator();
while (it.hasNext()) {
ActiveSocket activeSocket = it.next();
socket = activeSocket.getSocket();
// ORIGINAL CODE BLOCK STARTS HERE. ==>
if (socket != null) {
try {
socket.close();
} catch (final IOException ex) {
LOGGER.info(ERR_SOCKCLOSE);
}
}
// <== ORIGINAL CODE BLOCK ENDS HERE.
}
stopTimers();
terminated = true;
started = false;
if (internalTerminatedSignal != null) {
try {
internalTerminatedSignal.await(COUNTDOWN_WAIT_SECONDS, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
LOGGER.info(FormatData.fullStackTrace(e));
}
}
if (terminatedSignal != null) {
LOGGER.info(this.getClass().getName() + " Updating countdown latch");
terminatedSignal.countDown();
}
}
这是连接()代码:
public void connect() {
boolean clientSocket = false;
LOGGER.info("Connecting ... ");
while (!shouldTerminate) {
ActiveSocket activeSocket = null;
Socket newSocket = null;
int tries = 0;
int loopCounter = 0;
if (isServer) {
if (host == null) {
LOGGER.info("Must specify a host ip or host name with HOST configuration tag ");
return;
} else {
while (!shouldTerminate && (activeSockets.size() < this.maxActiveSockets)) {
activeSocket = new ActiveSocket(this);
newSocket = null;
if (serverSocket != null) {
try {
LOGGER.info("Accept socket.");
newSocket = serverSocket.accept();
} catch (final IOException ex) {
if (shouldTerminate) {
if (ex.getMessage() != null) {
LOGGER.info(ex.getMessage());
}
} else {
LOGGER.info("IOException while accepting connection.");
LOGGER.warn(FormatData.fullStackTrace(ex));
}
newSocket = null;
}
if (newSocket != null) {
try {
LOGGER.info("Server socket keepalive ... ");
newSocket.setKeepAlive(true);
if (newSocket.getKeepAlive()) {
LOGGER.info("Server socket keep alive to host (" + host + "," + port + ") for BankID:" + Integer.toString(this.getBankID()));
}
activeSocket.setSocket(newSocket);
activeSockets.add(activeSocket);
increaseConnects();
break;
} catch (final SocketException ex) {
LOGGER.info("SocketException while opening socket.");
LOGGER.warn(FormatData.fullStackTrace(ex));
newSocket = null;
}
}
} else { // first time through the loop
try {
LOGGER.info("Opening server socket (" + host + "," + port + ") for BankID:" + Integer.toString(this.getBankID()));
serverSocket = new ServerSocket(port);
} catch (final IOException ex) {
LOGGER.info("Unable to open server socket socket (" + host + "," + port + ")");
if (ex.getMessage().indexOf("Cannot assign requested address") > -1) {
this.terminate();
final String logMessage = "Invalid IP Address assigned:" + host + ",port:" + port;
final String subject = logMessage;
ATMServer.sendNotification(subject, logMessage);
} else if (tries == 0) {
tries++;
final String logMessage = "Unable to open server socket (" + host + "," + port + ")";
final String subject = "Unable to open server socket (" + host + "," + port + ")";
ATMServer.sendNotification(subject, logMessage);
}
LOGGER.warn(FormatData.fullStackTrace(ex));
}
}
}
}
} else { // client socket -- connecting to entity
clientSocket = true;
while (!shouldTerminate && (activeSockets.size() < this.maxActiveSockets)) {
activeSocket = new ActiveSocket(this);
newSocket = null;
if (this.isInForcedStandIn()) {
LOGGER.info("Forced standin " + getName());
try {
Thread.sleep(3000);
} catch (final InterruptedException ex) {
LOGGER.info("Interrupted while waiting for connection.");
LOGGER.warn(FormatData.fullStackTrace(ex));
}
} else {
if (!shouldTerminate) {
if ((loopCounter % 120) == 0) {
try {
LOGGER.info("Connecting to host (" + remoteHost + "," + remotePort + ") for BankID:" + Integer.toString(this.getBankID()));
newSocket = new Socket(InetAddress.getByName(remoteHost), remotePort);
if (newSocket.isConnected()) {
tries = 0;
LOGGER.info("Client socket connected to host (" + remoteHost + "," + remotePort + ") for BankID:" + Integer.toString(this.getBankID()));
}
newSocket.setKeepAlive(true);
if (newSocket.getKeepAlive()) {
LOGGER.info("Client socket keep alive to host (" + remoteHost + "," + remotePort + ") for BankID:" + Integer.toString(this.getBankID()));
}
activeSocket.setSocket(newSocket);
activeSockets.add(activeSocket);
increaseConnects();
break;
} catch (final IOException ex) {
loopCounter++;
tries++;
LOGGER.info("SocketException while opening remote socket (" + remoteHost + "," + remotePort + ") " + " " + ex.getClass() + " " + ex.getMessage());
if ((tries % 300) == 0) {
recordErrorToDatabase(ex.getMessage());
}
}
} else {
loopCounter++;
try {
synchronized (clientConnectLock) {
clientConnectLock.wait(1000);
}
} catch (final InterruptedException inex) {
LOGGER.info("SocketException while opening remote socket " + Thread.currentThread().getName());
LOGGER.warn(FormatData.fullStackTrace(inex));
if (!this.shouldTerminate) {
recordErrorToDatabase("InterruptedException without terminate set.");
}
}
}
}
}
}
}
try {
// here, if we created a new ActiveSocket, establish dataInput and dataOuput streams
// for Discover, this will mean adding up to MaxActiveSockets # of sockets to the activeSockets list.
// for each other SwitchChannel, this will be the only activeSocket
if (activeSocket != null) {
LOGGER.info("Creating serverIn/serverOut data streams " + Thread.currentThread().getName());
DataInputStream dataInputStream = new DataInputStream(new BufferedInputStream(newSocket.getInputStream()));
if (newSocket.isConnected()) {
LOGGER.info("socket still connected to host (" + remoteHost + "," + remotePort + ") for BankID:" + Integer.toString(this.getBankID()));
}
DataOutputStream dataOutputStream = new DataOutputStream(new BufferedOutputStream(newSocket.getOutputStream(), 2048));
activeSocket.setDataInputStream(dataInputStream);
activeSocket.setDataOutputStream(dataOutputStream);
//activeSockets.add(activeSocket);
activeSocket.setNumReceivers(this.numReceivers);
ReceiveQueuer[] receiveQueuers = activeSocket.getReceiveQueuers();
LOGGER.info("Starting receive queuers");
for (int cnt = 0; cnt < numReceivers; cnt++) {
receiveQueuers[cnt].setName(this.systemName + "-Socket-" + Integer.toString(activeSockets.size()) + "-ReceiveQueuer-" + Integer.toString(cnt));
receiveQueuers[cnt].setActiveSocket(activeSocket);
receiveQueuers[cnt].start();
}
}
if (clientSocket) {
break;
}
} catch (final Exception ex) {
LOGGER.info("Exception while creating input/output streams " + Thread.currentThread().getName());
LOGGER.warn(FormatData.fullStackTrace(ex));
}
}
if (!shouldTerminate) {
LOGGER.info("Socket connection complete " + Thread.currentThread().getName());
} else {
LOGGER.info("Stopped establishing socket connection " + Thread.currentThread().getName());
}
}