1

我正在尝试REQ/REP用 Java 实现 ZMQ 模型

我有一个服务器角色,在 5564 后运行,它充当回复者

ZMQ.Socket repSock = context.socket(ZMQ.REP);

我有一个客户端角色,在 5563 后运行

ZMQ.Socket syncclient = context.socket(ZMQ.REQ);

我在中间有一个代理服务器,它传递请求和响应

ZMQ.proxy(reqSocket, repSocket, null);

拥有代理的好处是我可以添加多个服务器

repSocket.connect("tcp://" + addr.getHostAddress() + ":" + port);

哪个工作正常。

现在,当我从代理中删除服务器节点时

repSocket.disconnect("tcp://" + addr.getHostAddress() + ":" + port);

客户端卡住了,因为已经发出请求并且REQ-socket 等待响应。

所以这个过程停留在syncclient.recvStr()

for (int request_nbr = 0; request_nbr < (request_nbr + 1); request_nbr++) {          

    syncclient.send(str.getBytes(),0);
    System.out.println("Send Dataaaa....... " );
    String data = syncclient.recvStr(Charset.defaultCharset());
    System.out.println(" here.. " +data);

    request_nbr++;
}  

我搜索并找不到跟踪REQ-socket的方法


我需要两件事中的任何一件:

  1. 一种跟踪Socket我即将断开连接的-instance的方法,等待所有消息都被处理,这样syncclient.recvStr()就不会被阻止

  2. 一种重置syncclient-socket的方法,这样我就可以不间断地得到REQ/REP响应

4

1 回答 1

1

在现实世界的场景中,避免使用 ZeroMQ .send()/.recv()方法的阻塞模式并更好地使用.poll().

虽然这可能需要更多 SLOC 的代码,但结果会让您处于控制状态,而阻塞的 SLOC 会从您的代码中获取所有控制权,您无法对此做太多事情,直到(如果有的话)下一条消息被传递。这是一种非常错误的设计实践,除了最简单的教科书示例之外,这实际上是现实世界的反模式。

所以,不要指望问题 2会以某种方式神奇地解决,这不是 ZeroMQ API 的一部分(出于许多相当大声宣传的原因)。.setsockopt( ZMQ.REQ_RELAXED, 1 )如果 API 版本和使用环境允许,最好在两者之间做出决定,或者根本不使用琐碎的REQ/REP模式(因为已知有陷入无法挽救的相互死锁的风险(请参阅我关于这个主题的其他帖子,其中这种现象被说明和无数次解释))。

以类似的方式,如果您从未阅读过 ZeroMQ 规范和/或文档以及 ZeroMQ“最佳实践” ,那么问问题 1似乎是合理的。在此花费了一些时间后,您的选择将一目了然。没有这样的内置工具可以做到这一点。如果需要为她/他自己的需要添加任何类似的非核心逻辑,可以添加一些附加组件。唯一能间接影响行为的设置aSocket.close()在非空(消息仍在等待传递)。.setsockopt( ZMQ.LINGER, 0 )aSocket

进入分布式系统设计就像进入一个新世界。不保证任何序列(发生非串行代码执行路径)。无法对远程实体、它们的状态、它们的故障、它们的存在、它们的实际 ZeroMQ API 版本进行任何本地控制。

确实是一个充满挑战的世界。

注意:
您可能已经知道,一个可以在不使用代理的情况下对多个远程端进行.connect() aSocket实例化(更好的接入点到实例化)。aSocket通过将值.setsockopt()调整ZMQ.IMMEDIATE为 1,将有助于更好地管理循环分发策略,而与用于实际消息传递的传输类无关 ( { tcp:// | ipc:// | vmci:// | pgm:// | epgm:// | inproc:// })。这一切都触手可及。

于 2017-10-30T12:21:22.827 回答