0

我正在使用 Python、rabbitmq、pika、rabbit.js 和 node.js。

这个想法是向客户发送消息并采取相应的行动。有多种类型的消息正在发送。

所以,在服务器端,我有一个方法可以接收消息和交换将消息发送到:

import pika
def send(exchange, message):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange=exchange, type='fanout',)
    channel.basic_publish(exchange=exchange,
                          routing_key='',
                          body=message)

仍然在服务器端,使用 node.js,我有多个上下文,我相信交换会根据名称调用它们:(注意:“客户端”是指连接到服务器的客户端列表。)

var context = require('rabbit.js').createContext();

context.on('ready', function() {

    var consumer_1 = context.socket('SUB');
    var consumer_2 = context.socket('SUB');

    consumer_1.setEncoding('utf8');
    consumer_1.connect('consumer_1', function (){
    consumer_1.on('data', function(data){
        try {
            io.sockets.emit("client_method_1", data);
        }catch(err){
            console.log("[CONSUMER 1 ERROR] " + err);
        }

    });

    consumer_2.setEncoding('utf8');
    consumer_2.connect('client_method_2', function (){
    consumer_2.on('data', function(data){
        try {
            clients[data.client].emit('client_method_2', data.action );
        }
        catch(err) {
            console.log("[CONSUMER 2 ERROR]: " + err);
        }
    });

});

这里发生的是 consumer_1 正在向所有客户端发送消息,而 consumer_2 正在向特定客户端发送消息,并且该部分运行良好。

问题是所有消费者都在接收来自所有交易所的消息。如果我尝试向 consumer_1 发送消息,consumer_2 也会收到该消息。

在实际方面,如果我打电话:

send('consumer_1', data)

结果将是:

  • 节点将发射到“client_method_1”;
  • 并且还将尝试发送到“client_method_2”。

我如何打电话给特定的消费者而不打电话给其他人?

或者,我应该只声明一个消费者并过滤数据以调用不同的方法吗?

编辑:

按照 TokenMacGuy 的建议,我尝试将交换类型更改为“主题”或“直接”。那应该正是我想要的。

但是总有一个但是,不知何故我无法重新声明交换。当我重新启动节点时,我不断收到此错误:

[(406, "PRECONDITION_FAILED - cannot redeclare exchange 'consumer_1' in vhost '/' with different type, durable, internal or autodelete value")]

好的,所以我检查了 rabbitmq 上的 list_exchanged 并且消费者仍然使用“ fanout ”。

    direct
amq.direct  direct
amq.fanout  fanout
amq.headers headers
amq.match   headers
amq.rabbitmq.log    topic
amq.rabbitmq.trace  topic
amq.topic   topic
celery  direct
celery.pidbox   fanout
consumer_1    fanout
consumer_2    fanout
consumer_3    fanout

下一步是通过停止、重置(然后强制重置)和启动来重置 rabbitmq。

我已向 consumer_1 发送消息

现在 consumer_1 被列为“直接”:

    direct
amq.direct  direct
amq.fanout  fanout
amq.headers headers
amq.match   headers
amq.rabbitmq.log    topic
amq.rabbitmq.trace  topic
amq.topic   topic
celery  direct
celery.pidbox   fanout
consumer_1    direct
consumer_2    fanout
consumer_3    fanout

好的,因为我只调用了 consumer_1 并且所有其他都出现在列表中,所以必须有一个包含该信息的文件。在哪里?我不知道!!

无论如何,consumer_1 上会发生相同的 PRECONDITION_FAILED 错误。

有任何想法吗?

4

1 回答 1

0

fanout交换类型只有一种行为,将消息传递到绑定到它的每个队列。其他交换类型可以传递具有更窄标准的消息。最方便的可能是topic交换类型,它采用由通配符组成的绑定键,并导致所有匹配该通配符的消息被传递到那些特定的队列。(这里有一个详细的链接:http ://www.rabbitmq.com/tutorials/tutorial-five-python.html )

于 2013-04-10T12:30:31.653 回答