我正在学习 AMQP,所以这可能是对我正在做的事情的误解。我正在尝试在我自己的私人服务器上设置rabbitmq,以便为一组系统提供服务(我有一堆需要处理的图像)。我建立了一个三步管道:
| put image on queue | --work_queue--> | process | --results--> | archive results |
我安装了rabbitmq,并在服务器上创建了两个队列。“工作队列”和“结果”。我使用 amqplib 编写了一些 python 脚本,并且我的管道与一个图像处理器工作人员一起工作得很好。我已经向队列中添加了 100 张图像,而我的一台机器愉快地一次抓取一张并处理数据,并将结果放入结果队列中。
问题是,我假设如果我在另一台机器上启动另一个图像处理,它也会简单地将工作从队列中拉出。这似乎是 rabbitmq 网站上的“工作队列”教程中列出的确切情况。我希望这能正常工作。然而,真正发生的是,无论我启动多少其他工作人员,他们只是永远等待并且永远不会得到任何工作,即使在 work_queue 中有大量消息等待。
我误解了什么?在服务器上排队工作的相关代码:
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="foo:5672 ", userid="pipeline",
password="XXXXX", virtual_host="/", insist=False)
chan = conn.channel()
....
msg = { 'filename': os.path.basename(i) }
chan.basic_publish(amqp.Message(json.dumps(msg)), exchange='',
routing_key='work_queue')
以及流程工作者的消费者方面:
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="foo:5672 ", userid="pipeline",
password="XXXXX", virtual_host="/", insist=False)
chan = conn.channel()
def work_callback(msg):
....
while True:
chan.basic_consume(callback=work_callback, queue='work_queue')
try:
chan.wait()
except KeyboardInterrupt:
print "\nExiting cleanly"
sys.exit(0)
我可以看到其他工作人员已连接:
$ sudo rabbitmqctl list_queues
Listing queues ...
results 0
work_queue 246
...done.
$ sudo rabbitmqctl list_connections
Listing connections ...
pipeline 192.168.8.1 41553 running
pipeline XX.YY.ZZ.WW 46676 running
pipeline 192.168.8.4 44482 running
pipeline 192.168.8.6 41884 running
...done.
其中 XX.YY.ZZ.WW 是外部 IP。位于 192.168.8.6 的工作人员正在队列中忙碌,但外部 IP 上的工作人员闲置在那里,而 246 条消息在它应该等待的队列中等待。
想法?