可以使用 RabbitMQ 通道在一个 Python 线程中完成消费和发布吗?
5 回答
我认为你不应该想要。MQ 表示异步处理。在我看来,在同一个线程中进行消费和生产违背了目的。
我建议看看 Celery ( http://celery.readthedocs.org/en/latest/ ) 来管理工作任务。这样,您就不需要直接与 RMQ 集成,因为它会为您处理生产和消费。
但是,如果您确实希望直接与 RMQ 集成并管理自己的工作人员,请查看 Kombu ( http://kombu.readthedocs.org/en/latest/ ) 以了解集成。有非阻塞的消费者和生产者允许你在同一个事件循环中拥有两者。
我认为你的问题的简单答案是肯定的。但这取决于你想做什么。我的猜测是你有一个循环在一个通道上从你的线程中消耗,经过一些(小或大)处理后,它决定将它发送到另一个通道上的另一个队列(或交换)然后我看不出有任何问题一点也不。尽管最好将其分派到不同的线程,但这不是必需的。
如果您提供有关您的流程的更多详细信息,那么它可能有助于给出更具体的答案。
Kombu是一个用于处理 RabbitMQ 的常用 python 库(Celery 在后台使用它)。在这里值得指出的是,我尝试过的最简单使用 Kombu 的问题的答案是“不 - 你不能在同一个消费者回调线程上接收和发布。 ”
具体来说,如果队列中有几条消息,消费者已经为该主题注册了回调,并且该回调进行了一些处理并发布了结果,那么结果的发布将导致队列中的第二条消息在它之前点击回调已从第一条消息的发布返回 - 因此您最终会递归调用回调。如果队列中有 n 条消息,则调用堆栈在展开之前将结束 n 条消息。显然,它很快就会爆发。
一种解决方案(不一定是最好的)是让回调只是将消息发布到消费者内部的一个简单队列中,该队列可以在主进程线程上处理(即在回调线程之外)
def process_message(self, body: str, message: Message):
# Queue the message for processing off this thread:
print("Start process_message ----------------")
self.do_process_message(body, message) if self.publish_on_callback else self.queue.put((body, message))#
print("End process_message ------------------")
def do_process_message(self, body: str, message: Message):
# Deserialize and "Process" the message:
print(f"Process message: {body}")
# ... msg processing code...
# Publish a processing output:
processing_output = self.get_processing_output()
print(f"Publishing processing output: {processing_output}")
self.rabbit_msg_transport.publish(Topics.ProcessingOutputs, processing_output)
# Acknowledge the message:
message.ack()
def run_message_loop(self):
while True:
print("Waiting for incoming message")
self.rabbit_connection.drain_events()
while not self.queue.empty():
body, message = self.queue.get(block=False)
self.do_process_message(body, message)
在上面的这个片段中, process_message 是回调。如果 publish_on_callback 是 True 你会在回调中看到递归 n deep for n message on rabbit queue。如果 publish_on_callback 是 False 它可以正确运行而无需在回调中递归。
另一种方法是为生产者交换使用第二个连接 - 与用于消费者的连接分开。这也有效,以便在再次为队列中的下一条消息触发回调之前完成使用消息和发布结果的回调。