1

我是基于 WebSockets 的实时应用程序的新手,并且停留在某一时刻。我的应用程序具有以下组件:

  1. 发生一些通用事件时触发的简单 Python 脚本。它接收数据并使用 pika 将其发送到队列 (RabbitMQ)。
  2. Tornado 应用程序(使用 sockjs-tornado)从队列(pika 异步客户端)接收消息,处理其内容,将新的应用程序状态保存到数据库并将数据广播到客户端(SockJS 客户端)。与客户端的通信只有一个方向——它们只是连接到服务器并接收数据。

问题是我不知道如何将从队列接收到的数据传递给所有客户端。我已经完成了发布/订阅交换,所以当用户连接到服务器时,会为每个用户与 RabbitMQ 建立新连接,但这不是我想要的。以下是我目前所掌握的。

常见/pika_client.py

import logging

import pika
from pika.adapters.tornado_connection import TornadoConnection


class PikaClient(object):

    def __init__(self, exchange, host='localhost', port=5672, vhost=''):
        # Default values
        self.connected = False
        self.connecting = False
        self.connection = None
        self.channel = None
        self.host = host
        self.port = port
        self.vhost = vhost
        self.exchange = exchange

        # preparing logger
        self.log = logging.getLogger(__name__)
        self.set_log_level()

    def set_log_level(self, log_level=logging.WARNING):
        self.log.setLevel(log_level)

    def connect(self):
        self.log.info("CONNECTING")
        if self.connecting:
            self.log.info('%s: Already connecting to RabbitMQ' %
                self.__class__.__name__)
            return

        self.log.info('%s: Connecting to RabbitMQ on localhost:5672' %
            self.__class__.__name__)
        self.connecting = True

        param = pika.ConnectionParameters(
            host=self.host,
            port=self.port,
            virtual_host=self.vhost
        )

        self.connection = TornadoConnection(param,
            on_open_callback=self.on_connected)
        self.connection.add_on_close_callback(self.on_closed)

    def on_connected(self, connection):
        self.log.info('%s: Connected to RabbitMQ on localhost:5672' %
            self.__class__.__name__)
        self.connected = True
        self.connection = connection
        self.connection.channel(self.on_channel_open)

    def on_channel_open(self, channel):
        self.log.info('%s: Channel Open, Declaring Exchange %s' %
            (self.__class__.__name__, self.exchange))
        self.channel = channel
        self.channel.exchange_declare(
            exchange=self.exchange,
            type="fanout",
            callback=self.on_exchange_declared
        )

    def on_exchange_declared(self, frame):
        self.log.info('%s: Exchange Declared, Declaring Queue' %
            (self.__class__.__name__))
        self.channel.queue_declare(exclusive=True,
            callback=self.on_queue_declared)

    def on_queue_declared(self, frame):
        self.log.info('%s: Queue Declared, Binding Queue %s' %
            (self.__class__.__name__, frame.method.queue))
        self.queue_name = frame.method.queue
        self.channel.queue_bind(
            exchange=self.exchange,
            queue=frame.method.queue,
            callback=self.on_queue_bound
        )

    def on_queue_bound(self, frame):
        self.log.info('%s: Queue Bound. To receive messages implement \
                       on_queue_bound method' % self.__class__.__name__)

    def on_closed(self, connection):
        self.log.info('%s: Connection Closed' % self.__class__.__name__)
        self.connected = False
        self.connection = None
        self.connecting = False
        self.channel = None
        self.connection = self.connect()

    def add_message_handler(self, handler):
        self.message_handler = handler

跟踪器.py

from sockjs.tornado import SockJSConnection

import settings
from common.pika_client import PikaClient


class QueueReceiver(PikaClient):
    """Receives messages from RabbitMQ """

    def on_queue_bound(self, frame):
        self.log.info('Consuming on queue %s' % self.queue_name)
        self.channel.basic_consume(consumer_callback=self.message_handler,
            queue=self.queue_name
        )


class TrackerConnection(SockJSConnection):

    def on_open(self, info):
        self.queue = QueueReceiver('clt')
        self.queue.add_message_handler(self.on_queue_message)
        self.queue.set_log_level(settings.LOG_LEVEL)
        self.queue.connect()

    def on_queue_message(self, channel, method, header, body):
        self.send(body)
        self.queue.channel.basic_ack(delivery_tag=method.delivery_tag)

它可以工作,但是就像我提到的,我希望只有一个连接来排队、接收消息、做一些事情并使用广播()方法向客户端广播结果。提前感谢您的帮助。

4

0 回答 0