我有一个 python 工作程序客户端,它启动了 10 个工作程序,每个工作程序连接到一个 RabbitMQ 队列。有点像这样:
#!/usr/bin/python
worker_count=10
def mqworker(queue, configurer):
connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost'))
channel = connection.channel()
channel.queue_declare(queue=qname, durable=True)
channel.basic_consume(callback,queue=qname,no_ack=False)
channel.basic_qos(prefetch_count=1)
channel.start_consuming()
def callback(ch, method, properties, body):
doSomeWork();
ch.basic_ack(delivery_tag = method.delivery_tag)
if __name__ == '__main__':
for i in range(worker_count):
worker = multiprocessing.Process(target=mqworker)
worker.start()
我遇到的问题是,尽管在通道上设置了 basic_qos,但第一个启动的工作人员接受了队列外的所有消息,而其他工作人员则坐在那里闲置。我可以在 rabbitmq 界面中看到这一点,即使我设置worker_count
为 1 并将 50 条消息转储到队列中,所有 50 条消息都进入“未确认”存储桶,而我希望 1 变为未确认,而其他 49 条消息准备好。
为什么这不起作用?