我是基于 WebSockets 的实时应用程序的新手,并且停留在某一时刻。我的应用程序具有以下组件:
- 发生一些通用事件时触发的简单 Python 脚本。它接收数据并使用 pika 将其发送到队列 (RabbitMQ)。
- Tornado 应用程序(使用 sockjs-tornado)从队列(pika 异步客户端)接收消息,处理其内容,将新的应用程序状态保存到数据库并将数据广播到客户端(SockJS 客户端)。与客户端的通信只有一个方向——它们只是连接到服务器并接收数据。
问题是我不知道如何将从队列接收到的数据传递给所有客户端。我已经完成了发布/订阅交换,所以当用户连接到服务器时,会为每个用户与 RabbitMQ 建立新连接,但这不是我想要的。以下是我目前所掌握的。
常见/pika_client.py:
import logging
import pika
from pika.adapters.tornado_connection import TornadoConnection
class PikaClient(object):
def __init__(self, exchange, host='localhost', port=5672, vhost=''):
# Default values
self.connected = False
self.connecting = False
self.connection = None
self.channel = None
self.host = host
self.port = port
self.vhost = vhost
self.exchange = exchange
# preparing logger
self.log = logging.getLogger(__name__)
self.set_log_level()
def set_log_level(self, log_level=logging.WARNING):
self.log.setLevel(log_level)
def connect(self):
self.log.info("CONNECTING")
if self.connecting:
self.log.info('%s: Already connecting to RabbitMQ' %
self.__class__.__name__)
return
self.log.info('%s: Connecting to RabbitMQ on localhost:5672' %
self.__class__.__name__)
self.connecting = True
param = pika.ConnectionParameters(
host=self.host,
port=self.port,
virtual_host=self.vhost
)
self.connection = TornadoConnection(param,
on_open_callback=self.on_connected)
self.connection.add_on_close_callback(self.on_closed)
def on_connected(self, connection):
self.log.info('%s: Connected to RabbitMQ on localhost:5672' %
self.__class__.__name__)
self.connected = True
self.connection = connection
self.connection.channel(self.on_channel_open)
def on_channel_open(self, channel):
self.log.info('%s: Channel Open, Declaring Exchange %s' %
(self.__class__.__name__, self.exchange))
self.channel = channel
self.channel.exchange_declare(
exchange=self.exchange,
type="fanout",
callback=self.on_exchange_declared
)
def on_exchange_declared(self, frame):
self.log.info('%s: Exchange Declared, Declaring Queue' %
(self.__class__.__name__))
self.channel.queue_declare(exclusive=True,
callback=self.on_queue_declared)
def on_queue_declared(self, frame):
self.log.info('%s: Queue Declared, Binding Queue %s' %
(self.__class__.__name__, frame.method.queue))
self.queue_name = frame.method.queue
self.channel.queue_bind(
exchange=self.exchange,
queue=frame.method.queue,
callback=self.on_queue_bound
)
def on_queue_bound(self, frame):
self.log.info('%s: Queue Bound. To receive messages implement \
on_queue_bound method' % self.__class__.__name__)
def on_closed(self, connection):
self.log.info('%s: Connection Closed' % self.__class__.__name__)
self.connected = False
self.connection = None
self.connecting = False
self.channel = None
self.connection = self.connect()
def add_message_handler(self, handler):
self.message_handler = handler
跟踪器.py
from sockjs.tornado import SockJSConnection
import settings
from common.pika_client import PikaClient
class QueueReceiver(PikaClient):
"""Receives messages from RabbitMQ """
def on_queue_bound(self, frame):
self.log.info('Consuming on queue %s' % self.queue_name)
self.channel.basic_consume(consumer_callback=self.message_handler,
queue=self.queue_name
)
class TrackerConnection(SockJSConnection):
def on_open(self, info):
self.queue = QueueReceiver('clt')
self.queue.add_message_handler(self.on_queue_message)
self.queue.set_log_level(settings.LOG_LEVEL)
self.queue.connect()
def on_queue_message(self, channel, method, header, body):
self.send(body)
self.queue.channel.basic_ack(delivery_tag=method.delivery_tag)
它可以工作,但是就像我提到的,我希望只有一个连接来排队、接收消息、做一些事情并使用广播()方法向客户端广播结果。提前感谢您的帮助。