OpenStack 使用 RabbitMQ 作为消息传递系统。为此目的有几个交换和队列。我发现名为“nova”的“主题”类型的交换用于消息传输。Exchange 使用路由键将消息路由到队列(http://www.rabbitmq.com/tutorials/amqp-concepts.html)。 (有用的图片在http://www.rabbitmq.com/img/tutorials/intro/hello-world-example-routing.png - 没有足够的声誉在这里发布) OpenStack 中有几个队列,如计算、证书、网络等。他们使用同名的路由键。因此,我使用这些路由键创建了几个新队列,以将它们与处理消息的消费者绑定。例如,有一个名为“compute”的队列使用名为“compute”的路由键。我创建了使用相同路由键的新队列“my_compute”。因为我认为它应该工作,我会收到消息。
我有一些代码可以连接到交换,创建我的队列和消费者。
def connect(params):
connection = kombu.Connection(hostname=params['host'])
exchange = kombu.entity.Exchange(name=params['exchange_name'],
type=params['exchange_type'],
durable=params['exchange_durable'],
auto_delete=params['exchange_auto_delete'],
internal=params['exchange_internal'])
queue_list = []
for queue in params['queues_params']:
queue_list.append(kombu.messaging.Queue(name=queue['name'],
exchange=exchange,
routing_key=queue['routing_key'],
channel=connection.channel(),
durable=queue['durable'],
auto_delete=queue['auto_delete']))
consumer = kombu.messaging.Consumer(channel=connection.channel(),
queues=queue_list,
no_ack=True,
callbacks=[self._process_message])
consumer.consume()
return connection
参数“params”是从 json 文件中获取的映射:
{
"host" : "xxx",
"exchange_name" : "nova",
"exchange_type" : "topic",
"exchange_durable" : false,
"exchange_auto_delete" : false,
"exchange_internal" : false,
"queues_params" : [
{
"name" : "my_compute",
"routing_key" : "compute",
"durable" : false,
"auto_delete" : false,
"arguments" : [ ]
},
{
"name" : "my_network",
"routing_key" : "network",
"durable" : false,
"auto_delete" : false,
"arguments" : [ ]
},
.
.
.
它正在工作。但我只收到网络队列的消息。我不知道是否还有其他消息,但似乎有。我对吗?还是有什么不对?还有其他消息吗?我怎样才能得到它们?