我正在使用py-amqplib在 Python 中访问 RabbitMQ。应用程序会不时收到监听某些 MQ 主题的请求。
第一次收到这样的请求时,它会创建一个 AMQP 连接和一个通道,并启动一个新线程来监听消息:
connection = amqp.Connection(host = host, userid = "guest", password = "guest", virtual_host = "/", insist = False)
channel = connection.channel()
listener = AMQPListener(channel)
listener.start()
AMQPListener非常简单:
class AMQPListener(threading.Thread):
def __init__(self, channel):
threading.Thread.__init__(self)
self.__channel = channel
def run(self):
while True:
self.__channel.wait()
创建连接后,它订阅感兴趣的主题,如下所示:
channel.queue_declare(queue = queueName, exclusive = False)
channel.exchange_declare(exchange = MQ_EXCHANGE_NAME, type = "direct", durable = False, auto_delete = True)
channel.queue_bind(queue = queueName, exchange = MQ_EXCHANGE_NAME, routing_key = destination)
def receive_callback(msg):
self.queue.put(msg.body)
channel.basic_consume(queue = queueName, no_ack = True, callback = receive_callback)
第一次这一切都很好。但是,它在后续请求订阅另一个主题时失败。在后续请求中,我重新使用 AMQP 连接和 AMQPListener 线程(因为我不想为每个主题启动一个新线程)并且当我调用上面的代码块时,channel.queue_declare()方法调用永远不会返回。我还尝试在那时创建一个新频道,并且connection.channel()调用也永远不会返回。
我能够让它工作的唯一方法是为每个主题(即routing_key)创建一个新的连接、通道和侦听器线程,但这真的不理想。我怀疑是 wait() 方法以某种方式阻塞了整个连接,但我不确定该怎么做。当然,我应该能够使用单个侦听器线程接收具有多个路由键(甚至在多个通道上)的消息?
一个相关的问题是:当该主题不再感兴趣时,我如何停止侦听器线程?如果没有消息,channel.wait() 调用似乎会永远阻塞。我能想到的唯一方法是向队列发送一条“毒化”它的虚拟消息,即。被听者解释为停止的信号。