我有以下情况。我想使用 amqp 客户端实现rabbitmq 的发布/订阅模型。我有订阅特定 ID 的订阅者。我想在该 ID 的名称上形成一个队列。这样所有想要订阅该特定 id 的订阅者都应该订阅该特定队列。因此,我想为具有该 ID 的所有订阅者基于特定 ID 形成一个唯一队列。但在我的情况下,它正在为相同的 id 形成不同的队列(我猜)。结果,特定 id 的订阅者能够以循环方式接收所有消息,尽管所有订阅者都接收到所有消息。下面是我使用的代码。
var express = require('express');
var amqp = require('amqp');
var app = express();
var httpServer = require('http').createServer();
var socketio = require('socket.io');
var io = socketio.listen(httpServer);
httpServer.listen(8000);
console.log("server started at port no 8000");
var rabbitConnection = amqp.createConnection({host:"localhost", port:"5672"});
var chatExchange;
rabbitConnection.on("ready", function()
{
console.log("RabbitConnection got ready");
chatExchange = rabbitConnection.exchange("chatExchange", {"type":"topic"});
//console.log("value of chatExchange is :",chatExchange);
});
io.sockets.on("connection", function(socket)
{
console.log("socket connected");
socket.on("metaData", function(data)
{
if(data.type=="publisher")
{
console.log("publisher connected");
socket.type="publisher";
}
else if(data.type=="subscriber")
{
console.log("subscriber connected");
socket.type="subscriber";
rabbitConnection.queue(""+data.channelName, function(queue) //MAIN PROBLEM LIES IN ABOVE LINE I GUESS. IT WILL FORM QUEUE FOR EVERY SOCKET CONNECTION. BUT IT SHOULD NOT FORM QUEUE IF IT ALREADY EXISTS.
{
console.log("queue formed");
console.log("queue is ", queue);
queue.bind("chatExchange",""+data.channelName);
queue.subscribe(function(message)
{
console.log("message is "+message.data.toString());
console.log("message emitted");
socket.emit("message",{"text":message.data.toString()});
});
});
}
});
socket.on("disconnect",function()
{
console.log("socket disconnected");
});
socket.on("message", function(data)
{
chatExchange.publish(""+data.channelName, data.text);
console.log("message came");
});
})
我看过 node-amqp 模块的文档。但我没有找到任何选项来检查队列的存在。有一些“持久性”选项。但我不认为它会起作用。请帮忙..提前谢谢..