如果不是强制性的,则不需要创建 FANOUT 交换。
您可以使用已用于其他交换的相同路由密钥创建 DIRECT 交换。而且也不需要为新的交换创建一个新的队列。您可以将现有队列与新交换一起使用。您只需要将新的交换与队列绑定。
这是我的 receive.js 文件:
var amqp = require("amqplib/callback_api");
var crontab = require('node-crontab');
amqp.connect("amqp://localhost", function (err, conn) {
conn.createChannel(function (err, ch) {
var ex = 'direct_logs';
var ex2 = 'dead-letter-test';
var severity = 'enterprise-1-key';
//assert "direct" exchange
ch.assertExchange(ex, 'direct', { durable: true });
//assert "dead-letter-test" exchange
ch.assertExchange(ex2, 'direct', { durable: true });
//if acknowledgement is nack() then message will be stored in second exchange i.e. ex2="dead-letter-test"
ch.assertQueue('enterprise-11', { exclusive: false, deadLetterExchange: ex2 }, function (err, q) {
var n = 0;
console.log(' [*] Waiting for logs. To exit press CTRL+C');
console.log(q);
//Binding queue with "direct_logs" exchange
ch.bindQueue(q.queue, ex, severity);
//Binding the same queue with "dead-letter-test"
ch.bindQueue(q.queue, ex2, severity);
ch.consume(q.queue, function (msg) {
// consume messages via "dead-letter-exchange" exchange at every second.
if (msg.fields.exchange === ex2) {
crontab.scheduleJob("* * * * * *", function () {
console.log("Received by latest exchange %s", msg.fields.routingKey, msg.content.toString());
});
} else {
console.log("Received %s", msg.fields.routingKey, msg.content.toString());
}
if (n < 1) {
// this will executes first time only. Here I'm sending nack() so message will be stored in "deadLetterExchange"
ch.nack(msg, false, false);
n += 1;
} else {
ch.ack(msg)
n = 0
}
}, { noAck: false });
});
});
});