首先,由于您希望仅通过一个接收消息的套接字进行单向通信,因此这通常意味着 PUSH-PULL。这是客户端的一个版本:
import zmq
ctx = zmq.Context.instance()
s = ctx.socket(zmq.PUSH)
url = 'tcp://127.0.0.1:5555'
s.connect(url)
while True:
msg = raw_input("msg > ")
s.send(msg)
if msg == 'quit':
break
所以一个 PUSH 套接字发送我们从 raw_input 获得的消息。应该清楚如何更改该逻辑以生成您想要的消息。一点好处是,如果您键入“退出”,客户端和服务器都将退出。
有多种方法可以实现非阻塞服务器,具体取决于应用程序的复杂性。我将展示一些示例,从最基本的到最强大/可扩展的。
所有这些服务器示例都假设在顶部,设置服务器的 PULL 套接字:
import time
import zmq
ctx = zmq.Context.instance()
s = ctx.socket(zmq.PULL)
url = 'tcp://127.0.0.1:5555'
s.bind(url)
第一个示例是简单的非阻塞接收,zmq.Again
如果没有准备好接收的消息,则会引发异常:
# server0.py
while True:
try:
msg = s.recv(zmq.NOBLOCK) # note NOBLOCK here
except zmq.Again:
# no message to recv, do other things
time.sleep(1)
else:
print("received %r" % msg)
if msg == 'quit':
break
但是这种模式很难扩展到非常简单的情况之外。第二个示例使用轮询器来检查套接字上的事件:
# server1.py
poller = zmq.Poller()
poller.register(s)
while True:
events = dict(poller.poll(0))
if s in events:
msg = s.recv()
print("received %r" % msg)
if msg == 'quit':
break
else:
# no message to recv, do other things
time.sleep(1)
在这个玩具示例中,这与第一个非常相似。poller.register
但是,与第一个不同的是,通过对 的进一步调用或将非零超时传递给 ,很容易扩展到许多套接字或事件poller.poll
。
最后一个例子使用了一个事件循环,并在消息到达时注册了一个回调。您可以使用这种模式构建非常复杂的应用程序,这是一种相当简单的编写代码的方法,只有在有工作要做时才有效。
# server2.py
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
def print_msg(msg):
print("received %r" % ' '.join(msg))
if msg[0] == 'quit':
ioloop.IOLoop.instance().stop()
# register the print_msg callback to be fired
# whenever there is a message on our socket
stream = ZMQStream(s)
stream.on_recv(print_msg)
# do other things in the meantime
tic = time.time()
def do_other_things():
print("%.3f" % (time.time() - tic))
pc = ioloop.PeriodicCallback(do_other_things, 1000)
pc.start()
# start the eventloop
ioloop.IOLoop.instance().start()
这就是处理 zmq 消息而不阻塞的一些基本方法。您可以将这些示例一起作为要点。