0

我正在尝试从活动的 WebSocket 连接(具有活动的订阅并正在接收消息)将消息发布到 RabbitMQ 队列,然后使用该消息并将其发送回相同的 WS 连接。我正在使用 Python 的websocketpika库来执行此操作。问题是,在处理消费的消息时,我不知道如何访问 websocket 对象来实际发送消息。同样的问题是将消息发布到队列。这是它的外观:

websocket.py

def on_message(ws, message):
    """Function that runs when a message is received"""
    # Would publish message here with something like channel.basic_publish()


def on_error(ws, error):
    """Function that runs when an error is thrown"""
    logging.error(error)


def on_close(ws, close_status_code, close_msg):
    """Function that runs when the connection is closed"""
    logging.info(close_msg, close_status_code)


def on_open(ws):
    """Function that runs when the connection is established"""
    ws.send(subscribe_all_messages_query())


def init_connection():
    
    websocket_url = "<some ws url>"
    websocket.enableTrace(False)

    ws = websocket.WebSocketApp(
        websocket_url,
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )
    ws.run_forever()

兔子MQ.py


connection = pika.BlockingConnection(
        pika.ConnectionParameters("localhost")
    )
    channel = connection.channel()
    channel.queue_declare(queue="message")

def message_callback(body):
    """Callback function for RabbitMQ queue"""
    # Would access ws object here and send the message with ws.send()


def add_message_to_queue(
    message: str,
    queue: str = "message",
):
    """Function that adds given message to given queue"""
    channel.basic_publish(exchange="", routing_key=queue, body=message)


def init_queue_consuming(channel, callback = message_callback, queue: str = "message"):
    """Initiates the consuming of messages from the given queue"""
    channel.basic_consume(queue=queue, auto_ack=True, on_message_callback=callback)
    channel.start_consuming()

一般流程: 从 WS 接收消息->将消息发布到队列->从队列中消费消息->通过 WS 发送回消息

有没有关于如何做到这一点的提示?

4

0 回答 0