0

我无法通过 kaazing javascript AMPQ 库发送发布消息。下面的代码非常适合 FANOUT 交换,但不适用于 DIRECT 交换。我能够连接但无法将消息发布到 DIRECT 交换。

function amqpTest(){
load();
var $ = function(s) { return document.getElementById(s); };
var console = $("console");

var log = function(message){
var pre = document.createElement("pre");
pre.style.wordWrap = "break-word";
pre.innerHTML = message;
console.insertBefore(pre, console.firstChild);
while(console.childNodes.length > 500){
console.removeChild(console.lastChild);
}
}

var url = "ws://localhost:8000/amqp";
var username = "guest";
var password = "guest";
var connect = $("connect");
var disconnect= $("disconnect");

var consumeExchange = "demo_direct_exchange";
var consumeMessageText = "call";
var alias = "player";

var myQueueName = "clientqueue";
var myConsumerTag = "clientkey";
var routingKey = "broadcastkey";
var exchangeName = consumeExchange;
var send = $("send");

connect.onclick = function(){
log("Connecting: "+ url + " " + username);
myQueueName = "client" + Math.floor(Math.random() * 1000000);
myConsumerTag = "client" + Math.floor(Math.random() * 1000000);

var version = "0-9-0";
amqp = new AmqpClient();
amqp.addEventListener("close", function(){
log("Disconnected");
});

amqp.connect(url, '/', {username:username, password:password}, version, openHandler);
}

var openHandler = function(){
log("CONNECTED");
log("Open Publish Channel...");
publishChannel = amqp.openChannel(publishChannelOpenHandler);
log("Open Consume Channel...");
consumeChannel = amqp.openChannel(consumeChannelOpenHandler);
};

var publishChannelOpenHandler = function(channel) {
log("Opened Publish Channel");
publishChannel.declareExchange(exchangeName, "direct", false, false, false);
publishChannel.addEventListener("declareexchange", function(){log("Exchange Declared : "+exchangeName)});
publishChannel.addEventListener("close", function() { log("Channel Closed : Publish Channel" )});

};

var consumeChannelOpenHandler = function(channel) {
log("Opened Consumed Channel");
consumeChannel.addEventListener("declarequeue", function(){ log("Queue Declared : " +myQueueName); });
consumeChannel.addEventListener("bindqueue", function() { log("QUEUE BOUND: " + exchangeName + " " + myQueueName)});
consumeChannel.addEventListener("subscribe", function() { log("CONSUME: " + myQueueName)});
consumeChannel.addEventListener("close", function() {log("CHANNEL CLOSED: consume channel");});
        consumeChannel.addEventListener("message", messageHandler);

consumeChannel.declareQueue(myQueueName, false, false, false, false, false)
 .bindQueue(myQueueName, exchangeName, routingKey, false)
 .consumeBasic(myQueueName, myConsumerTag, false, true, true, false);
}

var messageHandler = function(m){
alert("A");
var body  = m.body.getString(Charset.UTF8);


if (body == "start_auction"){
ss();
}
if (body == "new_bidder"){
alert("New Bidder");
}

//log(body);
};

disconnect.onclick = function() { 
log("DISCONNECT");
amqp.disconnect();
};

send.onclick = function(){
log ("SENDING MESSAGE ....");
var body = new ByteBuffer();
body.putString("new_bidder", Charset.UTF8);
body.flip();
var headers = {};
publishChannel.publishBasic(body, headers, consumeExchange, routingKey, false, false);

};

$("clear").onclick = function() { while (console.childNodes.length > 0)
{
console.removeChild(console.lastChild);
}
};

}

任何帮助将不胜感激。

4

1 回答 1

0

明白了...必须删除 declareExchange 的代码,因为交换已经存在。然而,奇怪的错误只针对直接交换而不是扇出.. :)

于 2012-06-25T11:37:53.953 回答