1

我正在学习 AMQP,所以这可能是对我正在做的事情的误解。我正在尝试在我自己的私人服务器上设置rabbitmq,以便为一组系统提供服务(我有一堆需要处理的图像)。我建立了一个三步管道:

| put image on queue | --work_queue--> | process | --results--> | archive results | 

我安装了rabbitmq,并在服务器上创建了两个队列。“工作队列”和“结果”。我使用 amqplib 编写了一些 python 脚本,并且我的管道与一个图像处理器工作人员一起工作得很好。我已经向队列中添加了 100 张图像,而我的一台机器愉快地一次抓取一张并处理数据,并将结果放入结果队列中。

问题是,我假设如果我在另一台机器上启动另一个图像处理,它也会简单地将工作从队列中拉出。这似乎是 rabbitmq 网站上的“工作队列”教程中列出的确切情况。我希望这能正常工作。然而,真正发生的是,无论我启动多少其他工作人员,他们只是永远等待并且永远不会得到任何工作,即使在 work_queue 中有大量消息等待。

我误解了什么?在服务器上排队工作的相关代码:

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="foo:5672 ", userid="pipeline",
    password="XXXXX", virtual_host="/", insist=False)
chan = conn.channel()

....

msg = { 'filename': os.path.basename(i) }

chan.basic_publish(amqp.Message(json.dumps(msg)), exchange='',
        routing_key='work_queue')

以及流程工作者的消费者方面:

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="foo:5672 ", userid="pipeline",
    password="XXXXX", virtual_host="/", insist=False)
chan = conn.channel()

def work_callback(msg):
.... 

while True:
    chan.basic_consume(callback=work_callback, queue='work_queue')
    try:
        chan.wait()
    except KeyboardInterrupt:
        print "\nExiting cleanly"
        sys.exit(0)

我可以看到其他工作人员已连接:

$ sudo rabbitmqctl list_queues
Listing queues ...
results 0
work_queue      246
...done.

$ sudo rabbitmqctl list_connections 
Listing connections ...
pipeline        192.168.8.1     41553   running
pipeline        XX.YY.ZZ.WW     46676   running
pipeline        192.168.8.4     44482   running
pipeline        192.168.8.6     41884   running
...done.

其中 XX.YY.ZZ.WW 是外部 IP。位于 192.168.8.6 的工作人员正在队列中忙碌,但外部 IP 上的工作人员闲置在那里,而 246 条消息在它应该等待的队列中等待。

想法?

4

1 回答 1

0

很抱歉回答我自己的问题,但我终于得到了我正在寻找的行为。我需要将 QoS prefetch_count 设置为 1。显然,通过不设置 prefetch_count,一旦一个客户端连接,所有 200 多条消息都会被传递到其“通道”并从队列中排出,因此除非其他人看到任何工作,否则原来的工作断开连接(因此关闭通道并将消息放回队列。

通过添加:

chan.basic_qos(0,1,False)

对我的员工来说,他们现在一次只能抓取一条消息。不完全是我所期望的,但它现在仍然按预期工作。

于 2013-08-31T01:09:27.993 回答