3

我有两个应用程序,在两个方向上异步发送消息。我在两边都使用 ZMQ.DEALER 类型的套接字。连接状态还由心跳控制。

在连接问题(线路故障或一侧的应用程序重新启动)后,我现在无法使连接可靠地恢复。当我在服务器端重新启动应用程序时(执行bind()的端),客户端并不总是成功重新连接,然后需要重新启动,特别是当本地缓冲区达到HWM限制时。

除了在心跳失败或 send() 返回 false 的情况下重置完整的 ZMQ.Context 之外,我没有找到任何其他方法来使连接恢复可靠。然后我将调用 Context.term() 并再次创建 Context 和 Socket。这在我的测试中似乎运行良好。但现在我观察到 Context.term() 中偶尔出现挂断,这种情况很少见且难以重现。我知道,创建上下文应该只在应用程序启动时完成一次,但正如我所说,我没有找到其他方法来重新建立断开的连接。

我正在使用 JeroMQ 0.3.4。下面是一个测试应用程序的源代码,大约 200 行代码。

非常感谢任何解决此问题的提示。

import java.util.Calendar;
import org.zeromq.ZMQ;

public class JeroMQTest {
    public interface IMsgListener {
        public void newMsg(byte[] message);
    }

    final static int delay = 100;
    final static boolean doResetContext = true;
    static JeroMQTest jeroMQTest;
    static boolean isServer;
    private ZMQ.Context zContext;
    private ZMQ.Socket zSocket;
    private String address = "tcp://localhost:9889";
    private long lastHeartbeatReceived = 0;
    private long lastHeartbeatReplyReceived;
    private boolean sendStat = true, serverIsActive = false, receiverInterrupted = false;
    private Thread receiverThread;
    private IMsgListener msgListener;

    public static void main(String[] args) {
        isServer = args.length > 0 && args[0].equals("true");

        if (isServer) {
            new JeroMQTest().runServer();
        }
        else {
            new JeroMQTest().runClient();
        }
    }

    public void runServer() {
        msgListener = new IMsgListener() {
            public void newMsg(byte[] message) {
                String msgReceived = new String(message);
                if (msgReceived.startsWith("HEARTBEAT")) {
                    String msgSent = "HEARTBEAT_REP " + msgReceived.substring(10);
                    sendStat = zSocket.send(msgSent.getBytes());
                    System.out.println("heartbeat rcvd, reply sent, status:" + sendStat);
                    lastHeartbeatReceived = getNow();
                } else {
                    System.out.println("msg received:" + msgReceived);
                }
            }
        };

        createJmq();
        sleep(1000);

        int ct = 1;
        while (true) {
            boolean heartbeatsOk = lastHeartbeatReceived > getNow() - delay * 4;
            if (heartbeatsOk) {
                serverIsActive = true;
                String msg = "SERVER " + ct;
                sendStat = zSocket.send(msg.getBytes());
                System.out.println("msg sent:" + msg + ", status:" + sendStat);
                ct++;
            }

            if (serverIsActive && (!heartbeatsOk || !sendStat)) {
                serverIsActive = false;
                if (doResetContext) {
                    resetContext();
                }
            }
            sleep(delay);
        }
    }

    public void runClient() {
        msgListener = new IMsgListener() {
            public void newMsg(byte[] message) {
                String msgReceived = new String(message);
                if (msgReceived.startsWith("HEARTBEAT_REP")) {
                    System.out.println("HEARTBEAT_REP received:" + msgReceived);
                    lastHeartbeatReplyReceived = getNow();
                }
                else {
                    System.out.println("msg received:" + msgReceived);
                }
            }
        };

        createJmq();
        sleep(1000);

        int ct = 1;
        boolean reconnectDone = false;
        while (true) {
            boolean heartbeatsOK = lastHeartbeatReplyReceived > getNow() - delay * 4;
            String msg = "HEARTBEAT " + (ct++);
            sendStat = zSocket.send(msg.getBytes());
            System.out.println("heartbeat sent:" + msg + ", status:" + sendStat);
            sleep(delay / 2);

            if (sendStat) {
                msg = "MSG " + ct;
                sendStat = zSocket.send(msg.getBytes());
                System.out.println("msg sent:" + msg + ", status:" + sendStat);
                reconnectDone = false;
            }

            if ((!heartbeatsOK && lastHeartbeatReplyReceived > 0) || (!sendStat && !reconnectDone)) {
                if (doResetContext) {
                    resetContext();
                }
                lastHeartbeatReplyReceived = 0;
                reconnectDone = true;
            }
            sleep(delay / 2);
        }
    }

    public void resetContext() {
        closeJmq();
        sleep(1000);
        createJmq();
        System.out.println("resetContext done");
    }

    private void createJmq() {
        zContext = ZMQ.context(1);
        zSocket = zContext.socket(ZMQ.DEALER);
        zSocket.setSendTimeOut(100);
        zSocket.setReceiveTimeOut(100);
        zSocket.setSndHWM(10);
        zSocket.setRcvHWM(10);
        zSocket.setLinger(100);

        if (isServer) {
            zSocket.bind(address);
        } else {
            zSocket.connect(address);
        }

        receiverThread = new Thread() {
            public void run() {
                receiverInterrupted = false;
                try {
                    ZMQ.Poller poller = new ZMQ.Poller(1);
                    poller.register(zSocket, ZMQ.Poller.POLLIN);
                    while (!receiverInterrupted) {
                        if (poller.poll(100) > 0) {
                            byte byteArr[] = zSocket.recv(0);
                            msgListener.newMsg(byteArr);
                        }
                    }
                    poller.unregister(zSocket);
                } catch (Throwable e) {
                    System.out.println("Exception in ReceiverThread.run:" + e.getMessage());
                }
            }
        };
        receiverThread.start();
    }

    public void closeJmq() {
        receiverInterrupted = true;
        sleep(100);
        zSocket.close();
        zContext.term();
    }

    long getNow() {
        Calendar now = Calendar.getInstance();
        return (long) (now.getTime().getTime());
    }

    private static void sleep(int mSleep) {
        try {
            Thread.sleep(mSleep);
        } catch (InterruptedException e) {
        }
    }
}
4

0 回答 0