我需要使用 Amazon MQ 在 Nodejs 中使用 amqp 协议来消费和发布消息到队列。我已经设置了 AWS MQ,定义了代理并创建了一个队列。
我遵循了 AWS Javascript SDK,但仍然找不到任何方法来消费和发布消息到队列。
有人可以帮我如何使用 amqp 协议连接到 AWS MQ 来消费和发布消息到队列。
谢谢
我使用了 amqp10 npm 模块,该模块用于消费和发布消息到 AWS MQ。
下面是代码:
const AMQPClient = require('amqp10').Client;
const Policy = require('amqp10').Policy;
1.从 AWS MQ 消费消息:
let client = new AMQPClient(Policy.Utils.RenewOnSettle(1, 1,
Policy.ServiceBusQueue));
let connectionString = 'your_connnection_string';
client.connect(connectionString)
.then(function() {
console.log("Connected");
return Promise.all([
client.createReceiver(configurationHolder.config.getMessageQueueName)
]);
})
.spread(function(receiver) {
receiver.on('errorReceived', function(rx_err) {
console.warn('===> RX ERROR: ', rx_err);
return err;
});
receiver.on('message', function(message) {
client.disconnect().then(function() {
console.log('disconnected, when we get the message from the queue);
return message.body;
});
});
})
.error(function(e) {
console.warn('connection error: ', e);
return err;
});
将消息发布到 AWS MQ:
let client = new AMQPClient(Policy.merge({
senderLinkPolicy: {
callbackPolicy: Policy.Utils.SenderCallbackPolicies.OnSent
}
}, Policy.DefaultPolicy));
client.connect(connectionString, {
'saslMechanism': 'ANONYMOUS'
})
.then(function() {
console.log("Connected");
return Promise.all([
client.createSender(queueName)
]);
})
.spread(function(sender) {
sender.on('errorReceived', function(tx_err) {
console.warn('===> TX ERROR: ', tx_err);
return err;
});
var options = {
annotations: {
'x-opt-partition-key': 'pk' + msgValue
}
};
return sender.send(JSON.stringify(msgValue),
options).then(function(state) {
client.disconnect().then(function() {
console.log('disconnected, when we saw the value we
inserted after publish to AWS MQ.');
return state;
});
});
})
.error(function(e) {
console.warn('connection error: ', e);
return err;
});
谢谢