1

I want to be able to detect whether an exchange does not exist when submitting a message to AMQP.

Consider following example.

#!/usr/bin/python

import amqp
from time import sleep

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/")
outgoing = conn.channel()
message = amqp.Message("x")


while True:
    print "publish message."
    outgoing.basic_publish(message,exchange="non-existing",routing_key="fubar")
    sleep(1)

This script keeps publishing to the exchange but does not raise any errors if the exchange does not exist. When the exchange exists, the messages arrive.

#!/usr/bin/python

import amqp
from time import sleep

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/")
outgoing = conn.channel()
message = amqp.Message("x")


while True:
    print "publish message."
    outgoing.basic_publish(message,exchange="non-existing",routing_key="fubar")
    outgoing.wait()
    sleep(1)

When I add outgoing.wait() a amqp.exceptions.NotFound is raised which is what I want. The problem is however that if in this case the exchange exists, the message arrives but outgoing.wait() blocks my loop. (I could run outgoing.wait() in a separate thread but that I do not want to do.)

How to deal with this?

Any advice tips pointers welcome

Thanks,

Jay

4

2 回答 2

4

如果您想查明交换是否存在,请使用 exchange_declare 方法并将被动标志设置为 True。将被动标志设置为 True 将阻止服务器尝试创建交换,如果交换不存在则抛出错误。

import amqp
from amqp.exceptions import NotFound

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest",
                       virtual_host="/")
outgoing = conn.channel()
try:
    outgoing.exchange_declare("fubar", "", passive=True)
except NotFound:
    print "Exchange 'fubar' does not exist!"

如果您真正感兴趣的是确保交换在发布到它之前存在,只需在进入发送循环之前声明它。如果交换已经存在,则不会发生任何事情。如果交易所不存在,它将被创建。

import amqp

conn = amqp.Connection(host="localhost:5672", userid="guest", password="guest",
                       virtual_host="/")
outgoing = conn.channel()
outgoing.exchange_declare("fubar", "direct")

这是您正在使用的 amqp 库中 exchange_declare 的方法声明的链接:

https://github.com/celery/py-amqp/blob/master/amqp/channel.py#L460-L461

于 2013-03-01T18:54:50.700 回答
1

不幸的是,您需要一个阻塞调用来检查来自 basic_publish() 的异常。但是,您可以做的是在进入异步循环之前运行一次阻塞调用:

# send a test message synchronously to see if the exchange exists
test_message = amqp.Message('exchange_test')
outgoing.basic_publish(test_message,exchange="non-existing",routing_key="fubar")
try:    
    outgoing.wait()
except amqp.exceptions.NotFound:
    # could not find the exchange, so do something about it
    exit()

while True:
    # fairly certain the exchange exists now, run the async loop
    print "publish message."
    outgoing.basic_publish(message,exchange="non-existing",routing_key="fubar")
    sleep(1)
于 2013-02-24T07:51:39.573 回答