0

我正在编写一个使用 rabbitmq 接收消息的节点 js 服务。但是当我尝试向我的节点 js 服务发送并发请求时,我遇到了一个问题。

这是我写的 amqp 订阅者,

const amqp = require('amqplib/callback_api')
let AmqpConnection = {
  // some other methods to make connection
  // ....
  //....



  subscribe: function(){
    this.withChannel((channel) => {
      let defaultQueueName = "my_queue";

      channel.assertQueue(defaultQueueName, { durable: true }, function(err, _ok) {
        if (err) throw err;
        channel.consume(defaultQueueName, AmqpConnection.processMessage);
        Logger.info("Waiting for requests..");
      });
    })       
  },

  processMessage: function(payload){
    debugger
    try {
      Logger.info("received"+(payload.content.toString()))
    }
    catch(error){
      Logger.error("ERROR: "+ error.message)
      //Channel.ack(payload)
    }
  }
}

现在我正在尝试使用发布者向此发布消息,

const amqp = require('amqplib/callback_api')
let Publisher = {
  // some other methods to make connection
  // ....
  // ....

  sendMessage: function(message){

    this.withChannel((channel) => {

      let exchangeName = 'exchange';
      let exchangeType = 'fanout';
      let defaultQueueName = 'my_queue';
      channel.assertExchange(exchangeName, exchangeType)
      channel.publish(exchangeName, defaultQueueName, new Buffer(message));
    })
  }
}

let invalidMsg = JSON.stringify({ "content": ""})
let correctMsg = JSON.stringify({ "content": "Test message"})


setTimeout(function () {
  for(let i=0; i<2; i++){
    Publisher.sendMessage(correctMsg)
    Publisher.sendMessage(invalidMsg)
  }
}, 3000)

但是当我同时执行发布者和订阅者时,我会在订阅者端得到以下输出

2017-02-18T11:27:55.368Z - info: received{"content":""}
2017-02-18T11:27:55.378Z - info: received{"content":""}
2017-02-18T11:27:55.379Z - info: received{"content":""}
2017-02-18T11:27:55.380Z - info: received{"content":""}

似乎并发请求正在覆盖收到的消息。有人可以帮忙吗?

4

0 回答 0