0

我应该说,在我开始使用 zmq reactor 而不是 poller 之前,一切正常。

class BaseZmqReceiver(BaseZmqNode):
    __metaclass__ = ABCMeta

    def __init__(self, host, port, hwm, bind, on_receive_callback):
        super(BaseZmqReceiver, self).__init__(host=host, port=port, bind=bind, hwm=hwm)
        self.node.on_message_callback = on_receive_callback
        self.stream = ZMQStream(self.socket)
        self.stream.on_recv(self.on_message_received)
        ZmqLoopRunner().start()

    def on_message_received(self, message):
        return self.node.on_message_callback(message)

    def create_node(self):
        return ReceivingNode(None, None)

class ZmqLoopRunner(Thread):

    def __init__(self):
        super(ZmqLoopRunner, self).__init__()
        self.loop = IOLoop.instance()
        self.daemon = True

    def run(self):
        self.loop.start()

    def stop(self):
        self.loop.stop()


class ZmqSubscriber(BaseZmqReceiver):
    def __init__(self, host, port, on_receive_callback, bind=False, hwm=1000):
        super(ZmqSubscriber, self).__init__(host=host, port=port, hwm=hwm, bind=bind,
                                            on_receive_callback=on_receive_callback)

    def create_socket(self):
        socket = self.context.socket(zmq.SUB)
        socket.setsockopt(zmq.SUBSCRIBE, "")
        return socket

这是我的 zmq 代码。

而且我基本上只是在回调中接收多部分消息。

def on_message(message):
    part1, part2 = message

每隔一小时我就会收到只包含一个部分的消息。所以我得到了

TypeError: need more than one value to unpack.

编辑这里是我完整的 zmq 代码。

https://drive.google.com/file/d/0B7jQezPDaLZFQWxBMUdXQkxnS1k/edit?usp=sharing

4

0 回答 0