2

我需要使用 Amazon MQ 在 Nodejs 中使用 amqp 协议来消费和发布消息到队列。我已经设置了 AWS MQ,定义了代理并创建了一个队列。

我遵循了 AWS Javascript SDK,但仍然找不到任何方法来消费和发布消息到队列。

有人可以帮我如何使用 amqp 协议连接到 AWS MQ 来消费和发布消息到队列。

谢谢

4

1 回答 1

1

我使用了 amqp10 npm 模块,该模块用于消费和发布消息到 AWS MQ。

下面是代码:

  const AMQPClient = require('amqp10').Client;
  const Policy = require('amqp10').Policy;

1.从 AWS MQ 消费消息:

      let client = new AMQPClient(Policy.Utils.RenewOnSettle(1, 1, 
Policy.ServiceBusQueue));
      let connectionString = 'your_connnection_string';
      client.connect(connectionString)
          .then(function() {
              console.log("Connected");
              return Promise.all([

client.createReceiver(configurationHolder.config.getMessageQueueName)
              ]);
          })
          .spread(function(receiver) {
                  receiver.on('errorReceived', function(rx_err) {
                      console.warn('===> RX ERROR: ', rx_err);
                      return err;
                  });
                  receiver.on('message', function(message) {
                      client.disconnect().then(function() {
                      console.log('disconnected, when we get the message from the queue);
                      return message.body;
                  });
              });
          })
          .error(function(e) {
                  console.warn('connection error: ', e);
                  return err;
              });
  1. 将消息发布到 AWS MQ:

    let client = new AMQPClient(Policy.merge({
        senderLinkPolicy: {
            callbackPolicy: Policy.Utils.SenderCallbackPolicies.OnSent
        }
    }, Policy.DefaultPolicy));
    
    
        client.connect(connectionString, {
                'saslMechanism': 'ANONYMOUS'
            })
            .then(function() {
                console.log("Connected");
                return Promise.all([
                    client.createSender(queueName)
                ]);
            })
            .spread(function(sender) {
                sender.on('errorReceived', function(tx_err) {
                    console.warn('===> TX ERROR: ', tx_err);
                    return err;
                });
                var options = {
                    annotations: {
                        'x-opt-partition-key': 'pk' + msgValue
                    }
                };
                return sender.send(JSON.stringify(msgValue), 
    options).then(function(state) {
                    client.disconnect().then(function() {
                        console.log('disconnected, when we saw the value we 
    inserted after publish to AWS MQ.');
                        return state;
                    });
                });
            })
            .error(function(e) {
                console.warn('connection error: ', e);
                return err;
            });
    

谢谢

于 2018-07-15T12:59:30.053 回答