我正在尝试从活动的 WebSocket 连接(具有活动的订阅并正在接收消息)将消息发布到 RabbitMQ 队列,然后使用该消息并将其发送回相同的 WS 连接。我正在使用 Python 的websocket和pika库来执行此操作。问题是,在处理消费的消息时,我不知道如何访问 websocket 对象来实际发送消息。同样的问题是将消息发布到队列。这是它的外观:
websocket.py:
def on_message(ws, message):
"""Function that runs when a message is received"""
# Would publish message here with something like channel.basic_publish()
def on_error(ws, error):
"""Function that runs when an error is thrown"""
logging.error(error)
def on_close(ws, close_status_code, close_msg):
"""Function that runs when the connection is closed"""
logging.info(close_msg, close_status_code)
def on_open(ws):
"""Function that runs when the connection is established"""
ws.send(subscribe_all_messages_query())
def init_connection():
websocket_url = "<some ws url>"
websocket.enableTrace(False)
ws = websocket.WebSocketApp(
websocket_url,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
ws.run_forever()
兔子MQ.py:
connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost")
)
channel = connection.channel()
channel.queue_declare(queue="message")
def message_callback(body):
"""Callback function for RabbitMQ queue"""
# Would access ws object here and send the message with ws.send()
def add_message_to_queue(
message: str,
queue: str = "message",
):
"""Function that adds given message to given queue"""
channel.basic_publish(exchange="", routing_key=queue, body=message)
def init_queue_consuming(channel, callback = message_callback, queue: str = "message"):
"""Initiates the consuming of messages from the given queue"""
channel.basic_consume(queue=queue, auto_ack=True, on_message_callback=callback)
channel.start_consuming()
一般流程: 从 WS 接收消息->将消息发布到队列->从队列中消费消息->通过 WS 发送回消息
有没有关于如何做到这一点的提示?