我应该说,在我开始使用 zmq reactor 而不是 poller 之前,一切正常。
class BaseZmqReceiver(BaseZmqNode):
__metaclass__ = ABCMeta
def __init__(self, host, port, hwm, bind, on_receive_callback):
super(BaseZmqReceiver, self).__init__(host=host, port=port, bind=bind, hwm=hwm)
self.node.on_message_callback = on_receive_callback
self.stream = ZMQStream(self.socket)
self.stream.on_recv(self.on_message_received)
ZmqLoopRunner().start()
def on_message_received(self, message):
return self.node.on_message_callback(message)
def create_node(self):
return ReceivingNode(None, None)
class ZmqLoopRunner(Thread):
def __init__(self):
super(ZmqLoopRunner, self).__init__()
self.loop = IOLoop.instance()
self.daemon = True
def run(self):
self.loop.start()
def stop(self):
self.loop.stop()
class ZmqSubscriber(BaseZmqReceiver):
def __init__(self, host, port, on_receive_callback, bind=False, hwm=1000):
super(ZmqSubscriber, self).__init__(host=host, port=port, hwm=hwm, bind=bind,
on_receive_callback=on_receive_callback)
def create_socket(self):
socket = self.context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, "")
return socket
这是我的 zmq 代码。
而且我基本上只是在回调中接收多部分消息。
def on_message(message):
part1, part2 = message
每隔一小时我就会收到只包含一个部分的消息。所以我得到了
TypeError: need more than one value to unpack.
编辑这里是我完整的 zmq 代码。
https://drive.google.com/file/d/0B7jQezPDaLZFQWxBMUdXQkxnS1k/edit?usp=sharing