1

OpenStack 使用 RabbitMQ 作为消息传递系统。为此目的有几个交换和队列。我发现名为“nova”的“主题”类型的交换用于消息传输。Exchange 使用路由键将消息路由到队列(http://www.rabbitmq.com/tutorials/amqp-concepts.html)。 (有用的图片在http://www.rabbitmq.com/img/tutorials/intro/hello-world-example-routing.png - 没有足够的声誉在这里发布) OpenStack 中有几个队列,如计算、证书、网络等。他们使用同名的路由键。因此,我使用这些路由键创建了几个新队列,以将它们与处理消息的消费者绑定。例如,有一个名为“compute”的队列使用名为“compute”的路由键。我创建了使用相同路由键的新队列“my_compute”。因为我认为它应该工作,我会收到消息。

我有一些代码可以连接到交换,创建我的队列和消费者。

def connect(params):
connection = kombu.Connection(hostname=params['host'])
exchange = kombu.entity.Exchange(name=params['exchange_name'],
                                 type=params['exchange_type'],
                                 durable=params['exchange_durable'],
                                 auto_delete=params['exchange_auto_delete'],
                                 internal=params['exchange_internal'])
queue_list = []
for queue in params['queues_params']:
queue_list.append(kombu.messaging.Queue(name=queue['name'],
                                        exchange=exchange,
                                        routing_key=queue['routing_key'],
                                        channel=connection.channel(),
                                        durable=queue['durable'],
                                        auto_delete=queue['auto_delete']))
consumer = kombu.messaging.Consumer(channel=connection.channel(),
                                    queues=queue_list, 
                                    no_ack=True,
                                    callbacks=[self._process_message])
consumer.consume()
return connection

参数“params”是从 json 文件中获取的映射:

{
"host"                 : "xxx",
"exchange_name"        : "nova",
"exchange_type"        : "topic",
"exchange_durable"     : false,
"exchange_auto_delete" : false,
"exchange_internal"    : false,
"queues_params"        : [
    {
        "name"        : "my_compute",
        "routing_key" : "compute",
        "durable"     : false,
        "auto_delete" : false,
        "arguments"   : [ ]
    },
    {
        "name"        : "my_network",
        "routing_key" : "network",
        "durable"     : false,
        "auto_delete" : false,
        "arguments"   : [ ]
    },
    .
    .
    .

它正在工作。但我只收到网络队列的消息。我不知道是否还有其他消息,但似乎有。我对吗?还是有什么不对?还有其他消息吗?我怎样才能得到它们?

4

2 回答 2

0

这段代码在这个开发周期中正在经历一些积极的变化,但目前我断言你看起来有点深入。对于大多数 nova 组件,队列接口在这些组件使用的 RPC 公共库下被抽象出来,并选择主题和队列。

特别是,当 RPC 代码想要向特定的计算、网络或存储主机发送消息时,主题也可以是特定于主机的。您将在上面看到的唯一消息是一般广播消息,实际上通常是请求有关浮动 IP 地址的信息

如果您想要一个捕获队列中所有消息的示例,您绝对应该查看 Ceilometer,它旨在做到这一点,并利用嵌入到 nova 和相关组件中的通知系统。它不会为您提供拦截和解释新星、网络和煤渣消息所提供的相同功能——如果有用的话,这取决于您的总体目标。

于 2013-05-27T19:05:19.783 回答
0

在您的 queues_params 中的 routing_key 中,您指定了“网络”,这就是您只接收网络消息的原因。您可以使用通配符 routing_key "#" 捕获与相应主题交换关联的所有消息。如果你愿意的话,你可以看看我的笔记

于 2013-11-21T19:27:04.183 回答