18

我有一个 python 工作程序客户端,它启动了 10 个工作程序,每个工作程序连接到一个 RabbitMQ 队列。有点像这样:

#!/usr/bin/python
worker_count=10

def mqworker(queue, configurer):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost'))
    channel = connection.channel()
    channel.queue_declare(queue=qname, durable=True)
    channel.basic_consume(callback,queue=qname,no_ack=False)
    channel.basic_qos(prefetch_count=1)
    channel.start_consuming()


def callback(ch, method, properties, body):
    doSomeWork();
    ch.basic_ack(delivery_tag = method.delivery_tag)

if __name__ == '__main__':
    for i in range(worker_count):
        worker = multiprocessing.Process(target=mqworker)
        worker.start()

我遇到的问题是,尽管在通道上设置了 basic_qos,但第一个启动的工作人员接受了队列外的所有消息,而其他工作人员则坐在那里闲置。我可以在 rabbitmq 界面中看到这一点,即使我设置worker_count为 1 并将 50 条消息转储到队列中,所有 50 条消息都进入“未确认”存储桶,而我希望 1 变为未确认,而其他 49 条消息准备好。

为什么这不起作用?

4

1 回答 1

25

我似乎已经通过移动basic_qos调用的地方解决了这个问题。

将它放在之后channel = connection.channel()似乎会改变我所期望的行为。

于 2012-09-14T14:55:22.893 回答