4

我有一个应用程序,其中每个 websocket 连接(在龙卷风打开回调中)都会为zmq.SUB现有zmq.FORWARDER设备创建一个套接字。想法是从 zmq 接收数据作为回调,然后可以通过 websocket 连接将其中继到前端客户端。

https://gist.github.com/abhinavsingh/6378134

ws.py

import zmq
from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ioloop.install()

from tornado.websocket import WebSocketHandler
from tornado.web import Application
from tornado.ioloop import IOLoop
ioloop = IOLoop.instance()

class ZMQPubSub(object):

    def __init__(self, callback):
        self.callback = callback

    def connect(self):
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.SUB)
        self.socket.connect('tcp://127.0.0.1:5560')
        self.stream = ZMQStream(self.socket)
        self.stream.on_recv(self.callback)

    def subscribe(self, channel_id):
        self.socket.setsockopt(zmq.SUBSCRIBE, channel_id)

class MyWebSocket(WebSocketHandler):

    def open(self):
        self.pubsub = ZMQPubSub(self.on_data)
        self.pubsub.connect()
        self.pubsub.subscribe("session_id")
        print 'ws opened'

    def on_message(self, message):
        print message

    def on_close(self):
        print 'ws closed'

    def on_data(self, data):
        print data

def main():
    application = Application([(r'/channel', MyWebSocket)])
    application.listen(10001)
    print 'starting ws on port 10001'
    ioloop.start()

if __name__ == '__main__':
    main()

转发器.py

import zmq

def main():
    try:
        context = zmq.Context(1)

        frontend = context.socket(zmq.SUB)
        frontend.bind('tcp://*:5559')
        frontend.setsockopt(zmq.SUBSCRIBE, '')

        backend = context.socket(zmq.PUB)
        backend.bind('tcp://*:5560')

        print 'starting zmq forwarder'
        zmq.device(zmq.FORWARDER, frontend, backend)
    except KeyboardInterrupt:
        pass
    except Exception as e:
        logger.exception(e)
    finally:
        frontend.close()
        backend.close()
        context.term()

if __name__ == '__main__':
    main()

发布.py

import zmq

if __name__ == '__main__':
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.connect('tcp://127.0.0.1:5559')
    socket.send('session_id helloworld')
    print 'sent data for channel session_id'

但是,我的ZMQPubSub班级似乎根本没有收到任何数据。

我进一步实验并意识到我需要ioloop.IOLoop.instance().start()在. 但是,这只会阻止执行。on_recvZMQPubSub

我也尝试将main.ioloop实例传递给ZMQStream构造函数,但也无济于事。

有没有一种方法可以绑定ZMQStream到现有main.ioloop实例而不阻塞其中的流MyWebSocket.open

4

2 回答 2

5

在您现在完整的示例中,只需将frontend您的转发器更改为 PULL 套接字,将您的发布者套接字更改为 PUSH,它的行为应该符合您的预期。

与此处相关的套接字选择的一般原则:

  • 当您想向准备接收它的每个人(可能没有人)发送消息时使用 PUB/SUB
  • 当您想向一个对等点发送消息时使用 PUSH/PULL,等待他们准备好

最初您可能只想要 PUB-SUB,但是一旦您开始查看每个套接字对,您就会意识到它们非常不同。连接绝对是frontend-websocketPUB-SUB - 您可能有零对多的接收器,并且您只想将消息发送给在消息通过时碰巧可用的每个人。但后端不同——只有一个接收者,它肯定想要来自发布者的每条消息。

所以你有它 - 后端应该是 PULL 和前端 PUB。你所有的插座:

PUSH -> [PULL-PUB] -> SUB

publisher.py:socket 是PUSH,连接到backenddevice.py

forwarder.py: backendis PULL, frontendis PUB ws.py:SUB连接并订阅forwarder.frontend.

在您的情况下,导致 PUB/SUB 在后端失败的相关行为是慢加入者综合症在指南中进行了描述。本质上,订阅者需要有限的时间来告诉发布者有订阅,所以如果你在打开 PUB 套接字后立即发送消息,很可能还没有被告知它有任何订​​阅者,所以它只是丢弃消息。

于 2013-08-31T05:17:03.500 回答
1

ZeroMq 订阅者必须订阅他们希望接收的消息;我在你的代码中没有看到。我相信 Python 的方式是这样的:

self.socket.setsockopt(zmq.SUBSCRIBE, "")
于 2013-08-28T13:31:32.697 回答