1

我一直在使用 ZeroMQ 的请求/响应套接字来在 Web 应用程序和用于卸载处理的从属应用程序之间交换消息。我注意到在少数情况下,并非所有发送的 ZMQ 消息都被对方实际接收。甚至奇怪的是,即使使用IPC协议也会发生这种情况,我认为 IPC 协议会非常可靠。

什么可能导致发送的消息没有被传递,但没有产生任何错误?

这是客户端代码的示例:

# ironic
class ReliableClient(object):
    def _reconnect(self):
        if self.socket:
            self.socket.close()
            self.socket = None

        self.socket = self.context.socket(zmq.REQ)
        self.socket.connect(self.server_url)

        # Give the server 2 sec to respond
        self.socket.RCVTIMEO = 2000
        self.socket.SNDTIMEO = 2000

        self.socket.LINGER = 3

    def __init__(self, server_url=None, server_name=None):
        self.socket = None
        self.server_url = server_url

        if server_name is None:
            self.server_name = server_url
        else:
            self.server_name  = server_name

        self.lock = threading.Lock()
        self.context = zmq.Context()

        self._reconnect()

    def msg(self, msg):
        raw_out = dumps(msg)

        # send
        self.lock.acquire()
        try:
            self.socket.send(
                raw_out,
                copy=True
            )
        except zmq.ZMQError as ex:
            log.exception(ex, '%s: failed to send', self.server_name)
            self.lock.release()
            raise CommunicationError('failed to send')

        # receive
        try:
            raw_in = self.socket.recv()
        except zmq.ZMQError as ex:
            log.exception(ex, '%s: failed to receive', self.server_name)
            raise CommunicationError('failed to receive')
        finally:
            self.lock.release()

        msg_in = loads(raw_in)  

        return msg_in
4

3 回答 3

3

我不熟悉 IPC 协议,但我已经广泛使用 ZMQ/TCP。

如果您正确使用它,即使是 TCP 上最简单的 ZMQ REQ/REP 模式也永远不会丢弃消息。如果您的网络连接或远程端点出现故障,它可能会无限期挂起,但它永远不会静默失败。某些套接字在设计上会在某些情况下丢弃消息。例如,有些可能会在达到 HWM 时丢弃消息。

于 2013-08-15T17:59:34.917 回答
2

什么可能导致发送的消息没有被传递,但没有产生任何错误?

如果您使用ROUTER套接字将消息从服​​务器传递到 zmq 工作进程,ROUTER则默认情况下,s 将丢弃它们无法传递的任何出站消息。“无法交付”是什么意思?ROUTER 维护客户端身份到客户端连接的内部映射,并且由于路由器上的所有消息都有一个身份,要么由客户端提供,要么由路由器自动分配,任何没有相应连接的出站消息路线,将被无声地丢弃。

您可以通过告诉 ROUTER 报告无法投递的消息来识别这种情况何时发生,换句话说,当 ROUTER 无法投递消息时会产生错误。在Java中,方法是routerSocket.setRouterMandatory(true),你只需要找到对应的python(我不是py人,哈哈)

如果您确定 ROUTER 正在丢弃消息,那么问题就变成了为什么?就我而言,我有一个 zmq 客户端,它在不同的线程上发送和接收服务器消息,而接收线程的连接速度不够快,无法从服务器发出初始的“Ok”响应,所以这只是时间问题客户。

希望有帮助

于 2013-08-14T19:31:56.073 回答
0

如果保证两个系统始终处于活动状态(24X7)或仅在两个系统处于活动状态时才发送消息,则 ZeroMQ 可以用作在系统之间传输小事件的绝佳方式,这是一个很好的选择。

由于订阅者速度慢,我们在 ZeroMQ 发布-订阅模式下发送大量消息时遇到问题,并且我们开始在中间丢失消息。后来我们转向了 ActiveMQ 嵌入式代理模式,它将消息保存在内存中,直到消费者消费它们。ZeroMQ 的好处是,即使没有发布者或者发布者稍后会出现,订阅者也可以启动。

Active MQ 的限制,在 Producer 创建 Queue 之前,无法启动 Consumer 从 Queue 中读取。它显然会抛出异常。我们也可以同时使用这两个库并充分利用它们。

于 2015-01-03T11:05:38.677 回答