我确实在我的代码中使用了 pyzmq 2.2.0.1(Windows 或 Linux 上的 python27),当我运行它时它可以工作(也是 python 线程):
def test_zmq_inverted_pub_sub():
import zmq
import time
ctx = zmq.Context()
sub = ctx.socket(zmq.SUB)
pub = ctx.socket(zmq.PUB)
sub.bind('tcp://127.0.0.1:5555')
sub.setsockopt(zmq.SUBSCRIBE, b'')
time.sleep(3)
pub.connect('tcp://127.0.0.1:5555')
pub.send(b'0')
assert sub.poll(3)
当我将 zmq 升级到 13.1.0(现在升级到 14.0.0)时,我发现这个测试不起作用。
我尝试搜索有关它的一些更改,但没有找到。当我在不同的进程上创建这个队列时,它可以工作,但我不想为我的测试打开新进程。有什么解释为什么它不起作用,我怎样才能正确地做这个测试?
谢谢。