我正在使用 Python、rabbitmq、pika、rabbit.js 和 node.js。
这个想法是向客户发送消息并采取相应的行动。有多种类型的消息正在发送。
所以,在服务器端,我有一个方法可以接收消息和交换将消息发送到:
import pika
def send(exchange, message):
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange=exchange, type='fanout',)
channel.basic_publish(exchange=exchange,
routing_key='',
body=message)
仍然在服务器端,使用 node.js,我有多个上下文,我相信交换会根据名称调用它们:(注意:“客户端”是指连接到服务器的客户端列表。)
var context = require('rabbit.js').createContext();
context.on('ready', function() {
var consumer_1 = context.socket('SUB');
var consumer_2 = context.socket('SUB');
consumer_1.setEncoding('utf8');
consumer_1.connect('consumer_1', function (){
consumer_1.on('data', function(data){
try {
io.sockets.emit("client_method_1", data);
}catch(err){
console.log("[CONSUMER 1 ERROR] " + err);
}
});
consumer_2.setEncoding('utf8');
consumer_2.connect('client_method_2', function (){
consumer_2.on('data', function(data){
try {
clients[data.client].emit('client_method_2', data.action );
}
catch(err) {
console.log("[CONSUMER 2 ERROR]: " + err);
}
});
});
这里发生的是 consumer_1 正在向所有客户端发送消息,而 consumer_2 正在向特定客户端发送消息,并且该部分运行良好。
问题是所有消费者都在接收来自所有交易所的消息。如果我尝试向 consumer_1 发送消息,consumer_2 也会收到该消息。
在实际方面,如果我打电话:
send('consumer_1', data)
结果将是:
- 节点将发射到“client_method_1”;
- 并且还将尝试发送到“client_method_2”。
我如何打电话给特定的消费者而不打电话给其他人?
或者,我应该只声明一个消费者并过滤数据以调用不同的方法吗?
编辑:
按照 TokenMacGuy 的建议,我尝试将交换类型更改为“主题”或“直接”。那应该正是我想要的。
但是总有一个但是,不知何故我无法重新声明交换。当我重新启动节点时,我不断收到此错误:
[(406, "PRECONDITION_FAILED - cannot redeclare exchange 'consumer_1' in vhost '/' with different type, durable, internal or autodelete value")]
好的,所以我检查了 rabbitmq 上的 list_exchanged 并且消费者仍然使用“ fanout ”。
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
celery direct
celery.pidbox fanout
consumer_1 fanout
consumer_2 fanout
consumer_3 fanout
下一步是通过停止、重置(然后强制重置)和启动来重置 rabbitmq。
我已向 consumer_1 发送消息
现在 consumer_1 被列为“直接”:
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
celery direct
celery.pidbox fanout
consumer_1 direct
consumer_2 fanout
consumer_3 fanout
好的,因为我只调用了 consumer_1 并且所有其他都出现在列表中,所以必须有一个包含该信息的文件。在哪里?我不知道!!
无论如何,consumer_1 上会发生相同的 PRECONDITION_FAILED 错误。
有任何想法吗?