嗨,我在 Python 类中包装一些 ZMQ 拉客户端时遇到了一些问题。这些类通过多处理模块在子进程中实例化和调用。当客户端是函数时一切正常,但当它们是类时 poller.poll() 挂起。
下面的代码有两个版本:一个有效,另一个无效。为什么?
import zmq
import time
import sys
import random
from multiprocessing import Process
def server_push(port="5556"):
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:%s" % port)
print "Running server on port: ", port
# serves only 5 request and dies
for reqnum in range(10):
if reqnum < 6:
socket.send("Continue")
else:
socket.send("Exit")
break
time.sleep (1)
def server_pub(port="5558"):
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
publisher_id = random.randrange(0,9999)
print "Running server on port: ", port
# serves only 5 request and dies
for reqnum in range(10):
# Wait for next request from client
topic = random.randrange(8,10)
messagedata = "server#%s" % publisher_id
print "%s %s" % (topic, messagedata)
socket.send("%d %s" % (topic, messagedata))
time.sleep(1)
class Client:
def __init__(self,port_push, port_sub):
context = zmq.Context()
self.socket_pull = context.socket(zmq.PULL)
self.socket_pull.connect ("tcp://localhost:%s" % port_push)
print "Connected to server with port %s" % port_push
self.socket_sub = context.socket(zmq.SUB)
self.socket_sub.connect ("tcp://localhost:%s" % port_sub)
self.socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
print "Connected to publisher with port %s" % port_sub
# Initialize poll set
def __call__(self):
poller = zmq.Poller()
poller.register(self.socket_pull, zmq.POLLIN)
poller.register(self.socket_sub, zmq.POLLIN)
# Work on requests from both server and publisher
should_continue = True
print "listening"
while should_continue:
print "hello"
socks = dict(poller.poll())
print poller
if self.socket_pull in socks and socks[self.socket_pull] == zmq.POLLIN:
message = self.socket_pull.recv()
print "Recieved control command: %s" % message
if message == "Exit":
print "Recieved exit command, client will stop recieving messages"
should_continue = False
if self.socket_sub in socks and socks[self.socket_sub] == zmq.POLLIN:
string = self.socket_sub.recv()
topic, messagedata = string.split()
print "Processing ... ", topic, messagedata
def client(port_push, port_sub):
context = zmq.Context()
socket_pull = context.socket(zmq.PULL)
socket_pull.connect ("tcp://localhost:%s" % port_push)
print "Connected to server with port %s" % port_push
socket_sub = context.socket(zmq.SUB)
socket_sub.connect ("tcp://localhost:%s" % port_sub)
socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
print "Connected to publisher with port %s" % port_sub
# Initialize poll set
poller = zmq.Poller()
poller.register(socket_pull, zmq.POLLIN)
poller.register(socket_sub, zmq.POLLIN)
# Work on requests from both server and publisher
should_continue = True
while should_continue:
socks = dict(poller.poll())
if socket_pull in socks and socks[socket_pull] == zmq.POLLIN:
message = socket_pull.recv()
print "Recieved control command: %s" % message
if message == "Exit":
print "Recieved exit command, client will stop recieving messages"
should_continue = False
if socket_sub in socks and socks[socket_sub] == zmq.POLLIN:
string = socket_sub.recv()
topic, messagedata = string.split()
print "Processing ... ", topic, messagedata
if __name__ == "__main__":
# Now we can run a few servers
server_push_port = "5556"
server_pub_port = "5558"
Process(target=server_push, args=(server_push_port,)).start()
Process(target=server_pub, args=(server_pub_port,)).start()
#~ Process(target=client,args=(server_push_port,server_pub_port)).start()
Process(target=Client(server_push_port,server_pub_port)).start()