我正在使用 kombu 通过生产者/消费者模型来管理 RabbitMQ。我启动了我的生产者,它在一个队列中放置了 100 个作业(我只有一个队列和一个交换器)。我想同时启动多个消费者,并让每个消费者一次处理一项工作。不幸的是,消费者相互阻塞(即,当一个消费者从队列中获取工作时,其他消费者只是闲置)。如果我杀死了正在工作的消费者,那么其他消费者之一就会介入并开始工作。有没有办法让所有消费者同时运行,每个消费者处理队列中的不同作业?我的消费者代码如下:
def start_consumer(self, incoming_exchange_name):
if self.rabbitmq_connection.connected:
callbacks=[]
queues=[]
callbacks.append(self._callback)
queues.append(self.incoming_queue)
print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
self.incoming_exchange(settings.rabbitmq_connection).declare()
self.incoming_queue(settings.rabbitmq_connection).declare()
with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks) as consumer:
while True:
try:
self.rabbitmq_connection.drain_events()
except Exception as e:
print 'Error -> %s' % e.message