2

我目前正在探索测试我的 zeromq 应用程序的可能性。我的印象是我可以在同一个线程中有一个发布者/订阅者,让发布者发布和订阅者订阅它而不会丢失消息。然而,当我让发布者发送几条消息时,没有一条消息能传递给订阅者。

这是我使用的代码:

import zmq

def main():
    ctx = zmq.Context.instance()
    sender = ctx.socket(zmq.PUB)
    sender.setsockopt(zmq.HWM, 1000)
    sender.bind('tcp://*:10001')

    rcvr = ctx.socket(zmq.SUB)
    rcvr.setsockopt(zmq.HWM, 1000)
    rcvr.connect('tcp://127.0.0.1:10001')
    rcvr.setsockopt(zmq.SUBSCRIBE, "")

    for i in range(100):
        sender.send('%i' % i)

    while True:
        try:
            print rcvr.recv(zmq.NOBLOCK)
        except zmq.ZMQError:
            break


if __name__ == '__main__':
    main()

运行它时,我没有得到任何输出。

令我印象深刻的是,接收方在发送方发送之前已连接,因此应该对这些消息进行排队。或者这是一个完全错误的假设,我应该使用 PUSH/PULL 代替?

4

3 回答 3

2

我认为这是ZeroMQ 指南中描述的慢连接问题的一个案例。

这种“慢木工”症状经常影响到足够多的人,我们将对其进行详细解释。

我认为主要问题是在订阅者套接字开始侦听之前所有消息都已发送,并且消息飞过并被丢弃。在设置套接字和发送消息之间设置延迟不起作用,因为在接收器开始侦听之前已经发送了最后一条消息。

正如您所建议的,推/拉套接字在内存中进行队列作业。您可以像这样在单个进程中的套接字之间发送作业

# pushpull.py
import zmq

def main():
    ctx = zmq.Context()
    sender = ctx.socket(zmq.PUSH)
    sender.bind('tcp://*:10001')

    rcvr = ctx.socket(zmq.PULL)
    rcvr.connect('tcp://127.0.0.1:10001')

    for i in range(100):
        sender.send_unicode('%i' % i)

    while True:
        msg = rcvr.recv()
        print(msg)

if __name__ == '__main__':
    main()

或者如果你想使用 pub/sub 套接字,我们需要两个进程和一个time.sleep(1)在套接字设置和消息发送之间:

首先启动接收器

# rcvr.py
import zmq

def main():
    ctx = zmq.Context()
    rcvr = ctx.socket(zmq.SUB)
    rcvr.connect('tcp://127.0.0.1:10001')
    rcvr.setsockopt_string(zmq.SUBSCRIBE, "")

    while True:
        msg = rcvr.recv()
        print(msg)

if __name__ == '__main__':
    main()

然后发件人,

# sender.py
import zmq
import time

def main():
    ctx = zmq.Context()
    sender = ctx.socket(zmq.PUB)
    sender.bind('tcp://*:10001')

    time.sleep(1)
    for i in range(100):
        sender.send_unicode('%i' % i)

if __name__ == "__main__":
    main()

受到:

b'0'
b'1'
b'2'
b'3' ...

我目前正在 Python 3.3 和 pyzmq 13.1.0 中使用出色的WinPython发行版,因此 zmq 调用中的一些字符串处理以及打印功能有些不同。希望能帮助到你。

于 2013-05-22T12:49:05.753 回答
1

You should be connecting your SUB socket to port 10000 not 10001. Currently the SUB socket is waiting for a publisher and the PUB socket is waiting for a subscriber. 0mq's feature of allowing 'clients' to connect without 'servers' already being present also means there is no error thrown when you connect to port 10001 and that is by design.

于 2013-04-13T23:46:41.773 回答
0

让我印象深刻的是,接收方在发送方发送之前就已连接

这实际上不是真的——接收者已经开始了连接过程,但这并不意味着这个过程已经完成。连接是异步的。

如果您实际上将其用于进程内通信,我建议您使用inproc传输,这不是问题:

url = 'inproc://whatever'
sender.bind(url)
...
recvr.connect(url)
于 2013-04-15T18:14:55.567 回答