6

这是我的脚本。


#!/usr/bin/env python

import traceback
import sys
import zmq
from time import sleep

print "Creating the zmq.Context"
context = zmq.Context()

print "Binding the publisher to the local socket at port 5557"
sender = context.socket(zmq.PUB)
sender.bind("tcp://*:5557")

print "Binding the subscriber to the local socket at port 5557"
receiver = context.socket(zmq.SUB)
receiver.connect("tcp://*:5557")

print "Setting the subscriber option to get only those originating from \"B\""
receiver.setsockopt(zmq.SUBSCRIBE, "B")

print "Waiting a second for the socket to be created."
sleep(1)

print "Sending messages"
for i in range(1,10):
    msg = "msg %d" % (i)
    env = None
    if i % 2 == 0:
        env = ["B", msg]
    else:
        env = ["A", msg]
    print "Sending Message:  ", env
    sender.send_multipart(env)

print "Closing the sender."
sender.close()

failed_attempts = 0
while failed_attempts < 3:
    try:
        print str(receiver.recv_multipart(zmq.NOBLOCK))
    except:
        print traceback.format_exception(*sys.exc_info())
        failed_attempts += 1 

print "Closing the receiver."
receiver.close()

print "Terminating the context."
context.term()

"""
Output:

Creating the zmq.Context
Binding the publisher to the local socket at port 5557
Binding the subscriber to the local socket at port 5557
Setting the subscriber option to get only those originating from "B"
Waiting a second for the socket to be created.
Sending messages
Sending Message:   ['A', 'msg 1']
Sending Message:   ['B', 'msg 2']
Sending Message:   ['A', 'msg 3']
Sending Message:   ['B', 'msg 4']
Sending Message:   ['A', 'msg 5']
Sending Message:   ['B', 'msg 6']
Sending Message:   ['A', 'msg 7']
Sending Message:   ['B', 'msg 8']
Sending Message:   ['A', 'msg 9']
Closing the sender.
['B', 'msg 2']
['B', 'msg 4']
['B', 'msg 6']
['B', 'msg 8']
['Traceback (most recent call last):\n', '  File "./test.py", line 43, in \n    print str(receiver.recv_multipart(zmq.NOBLOCK))\n', '  File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', '  File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', '  File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', '  File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n']
['Traceback (most recent call last):\n', '  File "./test.py", line 43, in \n    print str(receiver.recv_multipart(zmq.NOBLOCK))\n', '  File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', '  File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', '  File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', '  File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n']
['Traceback (most recent call last):\n', '  File "./test.py", line 43, in \n    print str(receiver.recv_multipart(zmq.NOBLOCK))\n', '  File "socket.pyx", line 611, in zmq.core.socket.Socket.recv_multipart (zmq/core/socket.c:5181)\n', '  File "socket.pyx", line 514, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4811)\n', '  File "socket.pyx", line 548, in zmq.core.socket.Socket.recv (zmq/core/socket.c:4673)\n', '  File "socket.pyx", line 99, in zmq.core.socket._recv_copy (zmq/core/socket.c:1344)\n', 'ZMQError: Resource temporarily unavailable\n']
Closing the receiver.
Terminating the context.
"""

而且,问题是......为什么这段代码不起作用?

[编辑] 在 zeromq 邮件列表上获得超快速响应后,我更新了上面的代码。

4

2 回答 2

10

学分:查克·雷姆斯

您可能需要在套接字创建步骤(绑定、连接、setsockopt)和消息的实际传输之间“休眠”。绑定和连接操作是异步的,因此当您到达发送所有消息的逻辑时,它们可能无法完成。在这种情况下,任何通过 PUB 套接字发送的消息都将被丢弃,因为 zmq_bind() 操作在另一个套接字成功连接到它之前不会创建队列。

作为旁注,您不需要在此示例中创建 2 个上下文。两个套接字都可以在同一个上下文中创建。它没有伤害,但也没有必要。

学分:彼得

第 1 章末尾有一个“问题解决器”解释了这一点。

某些套接字类型(ROUTER 和 PUB)会静默丢弃没有收件人的消息。正如查克所说,连接是异步的,大约需要 100 毫秒。如果您启动两个线程,绑定一侧,连接另一侧,然后立即开始通过这种套接字类型发送数据,您将丢失前 100 毫秒的数据(大约)。

睡觉是一个残酷的“证明它有效”的选择。实际上,您会以某种方式同步,或者(更典型地)期望消息丢失作为正常启动的一部分(即将发布的数据视为没有明确开始或结束的纯广播)。

有关详细信息,请参阅天气更新示例、syncpub 和 syncsub。

于 2011-06-05T20:15:18.963 回答
2

死灵发布,但对于那些对睡眠以外的解决方案感兴趣的人,有监视器。

您可以设置监视器回调并在 ZMQ_EVENT_CONNECTED 事件上被调用。

请参阅http://api.zeromq.org/3-3:zmq-ctx-set-monitor上的详细信息和示例。

于 2019-08-09T17:32:29.083 回答