我有两个应用程序,在两个方向上异步发送消息。我在两边都使用 ZMQ.DEALER 类型的套接字。连接状态还由心跳控制。
在连接问题(线路故障或一侧的应用程序重新启动)后,我现在无法使连接可靠地恢复。当我在服务器端重新启动应用程序时(执行bind()的端),客户端并不总是成功重新连接,然后需要重新启动,特别是当本地缓冲区达到HWM限制时。
除了在心跳失败或 send() 返回 false 的情况下重置完整的 ZMQ.Context 之外,我没有找到任何其他方法来使连接恢复可靠。然后我将调用 Context.term() 并再次创建 Context 和 Socket。这在我的测试中似乎运行良好。但现在我观察到 Context.term() 中偶尔出现挂断,这种情况很少见且难以重现。我知道,创建上下文应该只在应用程序启动时完成一次,但正如我所说,我没有找到其他方法来重新建立断开的连接。
我正在使用 JeroMQ 0.3.4。下面是一个测试应用程序的源代码,大约 200 行代码。
非常感谢任何解决此问题的提示。
import java.util.Calendar;
import org.zeromq.ZMQ;
public class JeroMQTest {
public interface IMsgListener {
public void newMsg(byte[] message);
}
final static int delay = 100;
final static boolean doResetContext = true;
static JeroMQTest jeroMQTest;
static boolean isServer;
private ZMQ.Context zContext;
private ZMQ.Socket zSocket;
private String address = "tcp://localhost:9889";
private long lastHeartbeatReceived = 0;
private long lastHeartbeatReplyReceived;
private boolean sendStat = true, serverIsActive = false, receiverInterrupted = false;
private Thread receiverThread;
private IMsgListener msgListener;
public static void main(String[] args) {
isServer = args.length > 0 && args[0].equals("true");
if (isServer) {
new JeroMQTest().runServer();
}
else {
new JeroMQTest().runClient();
}
}
public void runServer() {
msgListener = new IMsgListener() {
public void newMsg(byte[] message) {
String msgReceived = new String(message);
if (msgReceived.startsWith("HEARTBEAT")) {
String msgSent = "HEARTBEAT_REP " + msgReceived.substring(10);
sendStat = zSocket.send(msgSent.getBytes());
System.out.println("heartbeat rcvd, reply sent, status:" + sendStat);
lastHeartbeatReceived = getNow();
} else {
System.out.println("msg received:" + msgReceived);
}
}
};
createJmq();
sleep(1000);
int ct = 1;
while (true) {
boolean heartbeatsOk = lastHeartbeatReceived > getNow() - delay * 4;
if (heartbeatsOk) {
serverIsActive = true;
String msg = "SERVER " + ct;
sendStat = zSocket.send(msg.getBytes());
System.out.println("msg sent:" + msg + ", status:" + sendStat);
ct++;
}
if (serverIsActive && (!heartbeatsOk || !sendStat)) {
serverIsActive = false;
if (doResetContext) {
resetContext();
}
}
sleep(delay);
}
}
public void runClient() {
msgListener = new IMsgListener() {
public void newMsg(byte[] message) {
String msgReceived = new String(message);
if (msgReceived.startsWith("HEARTBEAT_REP")) {
System.out.println("HEARTBEAT_REP received:" + msgReceived);
lastHeartbeatReplyReceived = getNow();
}
else {
System.out.println("msg received:" + msgReceived);
}
}
};
createJmq();
sleep(1000);
int ct = 1;
boolean reconnectDone = false;
while (true) {
boolean heartbeatsOK = lastHeartbeatReplyReceived > getNow() - delay * 4;
String msg = "HEARTBEAT " + (ct++);
sendStat = zSocket.send(msg.getBytes());
System.out.println("heartbeat sent:" + msg + ", status:" + sendStat);
sleep(delay / 2);
if (sendStat) {
msg = "MSG " + ct;
sendStat = zSocket.send(msg.getBytes());
System.out.println("msg sent:" + msg + ", status:" + sendStat);
reconnectDone = false;
}
if ((!heartbeatsOK && lastHeartbeatReplyReceived > 0) || (!sendStat && !reconnectDone)) {
if (doResetContext) {
resetContext();
}
lastHeartbeatReplyReceived = 0;
reconnectDone = true;
}
sleep(delay / 2);
}
}
public void resetContext() {
closeJmq();
sleep(1000);
createJmq();
System.out.println("resetContext done");
}
private void createJmq() {
zContext = ZMQ.context(1);
zSocket = zContext.socket(ZMQ.DEALER);
zSocket.setSendTimeOut(100);
zSocket.setReceiveTimeOut(100);
zSocket.setSndHWM(10);
zSocket.setRcvHWM(10);
zSocket.setLinger(100);
if (isServer) {
zSocket.bind(address);
} else {
zSocket.connect(address);
}
receiverThread = new Thread() {
public void run() {
receiverInterrupted = false;
try {
ZMQ.Poller poller = new ZMQ.Poller(1);
poller.register(zSocket, ZMQ.Poller.POLLIN);
while (!receiverInterrupted) {
if (poller.poll(100) > 0) {
byte byteArr[] = zSocket.recv(0);
msgListener.newMsg(byteArr);
}
}
poller.unregister(zSocket);
} catch (Throwable e) {
System.out.println("Exception in ReceiverThread.run:" + e.getMessage());
}
}
};
receiverThread.start();
}
public void closeJmq() {
receiverInterrupted = true;
sleep(100);
zSocket.close();
zContext.term();
}
long getNow() {
Calendar now = Calendar.getInstance();
return (long) (now.getTime().getTime());
}
private static void sleep(int mSleep) {
try {
Thread.sleep(mSleep);
} catch (InterruptedException e) {
}
}
}