0

我确实在我的代码中使用了 pyzmq 2.2.0.1(Windows 或 Linux 上的 python27),当我运行它时它可以工作(也是 python 线程):

def test_zmq_inverted_pub_sub():
    import zmq
    import time
    ctx = zmq.Context()
    sub = ctx.socket(zmq.SUB)
    pub = ctx.socket(zmq.PUB)
    sub.bind('tcp://127.0.0.1:5555')
    sub.setsockopt(zmq.SUBSCRIBE, b'')
    time.sleep(3)
    pub.connect('tcp://127.0.0.1:5555')
    pub.send(b'0')
    assert sub.poll(3)

当我将 zmq 升级到 13.1.0(现在升级到 14.0.0)时,我发现这个测试不起作用。

我尝试搜索有关它的一些更改,但没有找到。当我在不同的进程上创建这个队列时,它可以工作,但我不想为我的测试打开新进程。有什么解释为什么它不起作用,我怎样才能正确地做这个测试?

谢谢。

4

1 回答 1

1

这主要是因为订阅是过滤 PUB 端的,从 zeromq 3.0 开始。订阅的传播需要有限的时间,因此您在建立连接后立即尝试发送的事实意味着您可能在 PUB 套接字知道它有任何订​​阅者之前发送。

有一个次要问题是已知错误,特定于 SUB 绑定和 PUB 连接的时间。结果是 SUB 套接字直到连接建立后第一次轮询/recvs 才告诉 PUB 它的订阅。

此版本的测试将通过:

def test_zmq_inverted_pub_sub():
    import zmq
    import time
    ctx = zmq.Context()
    sub = ctx.socket(zmq.SUB)
    pub = ctx.socket(zmq.PUB)
    sub.bind('tcp://127.0.0.1:5555')
    sub.setsockopt(zmq.SUBSCRIBE, b'')
    pub.connect('tcp://127.0.0.1:5555')
    # the first sub.poll is a workaround to force subscription propagation
    for i in range(2):
        pub.send(b'hi')
        evt = sub.poll(1)
        if evt:
            break
    assert evt
于 2013-11-19T01:35:58.047 回答