187

我刚开始使用 RabbitMQ 和 AMQP。

  • 我有一个消息队列
  • 我有多个消费者,我想用相同的消息做不同的事情。

大多数 RabbitMQ 文档似乎都集中在循环,即单个消费者使用单个消息,负载在每个消费者之间分散。这确实是我目睹的行为。

一个例子:生产者有一个队列,每 2 秒发送一次消息:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})

这是一个消费者:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})

如果我启动消费者两次,我可以看到每个消费者都在循环行为中消费替代消息。例如,我将在一个终端中看到消息 1、3、5,在另一个终端中看到 2、4、6

我的问题是:

  • 我可以让每个消费者收到相同的消息吗?即,两个消费者都收到消息 1、2、3、4、5、6?这在 AMQP/RabbitMQ 中叫什么?一般是怎么配置的?

  • 这通常是这样做的吗?我是否应该让交换将消息路由到两个单独的队列中,而只有一个消费者?

4

12 回答 12

144

我可以让每个消费者收到相同的消息吗?即,两个消费者都收到消息 1、2、3、4、5、6?这在 AMQP/RabbitMQ 中叫什么?一般是怎么配置的?

不,如果消费者在同一个队列中,则不会。来自 RabbitMQ 的AMQP 概念指南:

重要的是要了解,在 AMQP 0-9-1 中,消息在消费者之间是负载平衡的。

这似乎意味着队列中的循环行为是给定的,并且不可配置。即,为了让多个消费者处理相同的消息 ID,需要单独的队列。

这通常是这样做的吗?我是否应该让交换将消息路由到两个单独的队列中,而只有一个消费者?

不,不是,单个队列/多个消费者,每个消费者处理相同的消息 ID 是不可能的。让交换将消息路由到两个单独的队列中确实更好。

因为我不需要太复杂的路由,所以扇出交换会很好地处理这个问题。我之前并没有过多关注 Exchange,因为 node-amqp 具有“默认交换”的概念,允许您直接将消息发布到连接,但是大多数 AMQP 消息都会发布到特定的交换。

这是我的扇出交换,发送和接收:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   

    var sendMessage = function(exchange, payload) {
      console.log('about to publish')
      var encoded_payload = JSON.stringify(payload);
      exchange.publish('', encoded_payload, {})
    }

    // Recieve messages
    connection.queue("my_queue_name", function(queue){
      console.log('Created queue')
      queue.bind(exchange, ''); 
      queue.subscribe(function (message) {
        console.log('subscribed to queue')
        var encoded_payload = unescape(message.data)
        var payload = JSON.parse(encoded_payload)
        console.log('Recieved a message:')
        console.log(payload)
      })
    })

    setInterval( function() {    
      var test_message = 'TEST '+count
      sendMessage(exchange, test_message)  
      count += 1;
    }, 2000) 
 })
})
于 2012-05-16T15:14:13.767 回答
58

最后几个答案几乎是正确的——我有大量的应用程序会生成消息,这些消息需要最终到达不同的消费者,所以这个过程非常简单。

如果您希望多个消费者接收同一条消息,请执行以下过程。

创建多个队列,一个用于接收消息的每个应用程序,在每个队列属性中,将路由标记与 amq.direct 交换“绑定”。更改您的发布应用程序以发送到 amq.direct 并使用路由标签(不是队列)。AMQP 然后将消息复制到具有相同绑定的每个队列中。奇迹般有效 :)

示例:假设我生成了一个 JSON 字符串,我使用路由标签“new-sales-order”将其发布到“amq.direct”交换,我有一个用于打印订单的 order_printer 应用程序的队列,我有一个排队等待我的计费系统,它将向客户发送订单副本并向客户开具发票,我有一个网络存档系统,我出于历史/合规原因存档订单,我有一个客户端网络界面,可以在其他信息进入时跟踪订单订单。

所以我的队列是:order_printer、order_billing、order_archive 和 order_tracking 都绑定了绑定标签“new-sales-order”,所有 4 个都将获取 JSON 数据。

这是在发布应用不知道或关心接收应用的情况下发送数据的理想方式。

于 2015-09-14T01:17:02.517 回答
37

只需阅读rabbitmq 教程。您发布消息以进行交换,而不是排队;然后将其路由到适当的队列。在您的情况下,您应该为每个消费者绑定单独的队列。这样,他们可以完全独立地使用消息。

于 2012-05-16T15:11:01.927 回答
8

是的,每个消费者都可以收到相同的消息。看看 http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html http://www.rabbitmq。 com/tutorials/tutorial-5-python.html

用于路由消息的不同方式。我知道它们适用于 python 和 java,但理解原理、决定你在做什么然后找到如何在 JS 中执行它很好。听起来您想做一个简单的扇出(教程 3),它将消息发送到连接到交换的所有队列。

你在做什么和你想做的区别基本上是你要设置和交换或键入扇出。扇出交换将所有消息发送到所有连接的队列。每个队列都有一个消费者,可以分别访问所有消息。

是的,这很常见,它是 AMPQ 的功能之一。

于 2012-05-16T15:08:40.340 回答
7

发送模式是一对一的关系。如果你想“发送”给多个接收者,你应该使用 pub/sub 模式。有关详细信息,请参阅http://www.rabbitmq.com/tutorials/tutorial-three-python.html 。

于 2012-05-16T15:09:31.293 回答
3

RabbitMQ / AMQP:单个队列,同一消息和页面刷新的多个消费者。

rabbit.on('ready', function () {    });
    sockjs_chat.on('connection', function (conn) {

        conn.on('data', function (message) {
            try {
                var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, ''));

                if (obj.header == "register") {

                    // Connect to RabbitMQ
                    try {
                        conn.exchange = rabbit.exchange(exchange, { type: 'topic',
                            autoDelete: false,
                            durable: false,
                            exclusive: false,
                            confirm: true
                        });

                        conn.q = rabbit.queue('my-queue-'+obj.agentID, {
                            durable: false,
                            autoDelete: false,
                            exclusive: false
                        }, function () {
                            conn.channel = 'my-queue-'+obj.agentID;
                            conn.q.bind(conn.exchange, conn.channel);

                            conn.q.subscribe(function (message) {
                                console.log("[MSG] ---> " + JSON.stringify(message));
                                conn.write(JSON.stringify(message) + "\n");
                            }).addCallback(function(ok) {
                                ctag[conn.channel] = ok.consumerTag; });
                        });
                    } catch (err) {
                        console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack);
                    }

                } else if (obj.header == "typing") {

                    var reply = {
                        type: 'chatMsg',
                        msg: utils.escp(obj.msga),
                        visitorNick: obj.channel,
                        customField1: '',
                        time: utils.getDateTime(),
                        channel: obj.channel
                    };

                    conn.exchange.publish('my-queue-'+obj.agentID, reply);
                }

            } catch (err) {
                console.log("ERROR ----> " + err.stack);
            }
        });

        // When the visitor closes or reloads a page we need to unbind from RabbitMQ?
        conn.on('close', function () {
            try {

                // Close the socket
                conn.close();

                // Close RabbitMQ           
               conn.q.unsubscribe(ctag[conn.channel]);

            } catch (er) {
                console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack);
            }
        });
    });
于 2014-05-14T10:39:15.443 回答
2

我评估你的情况是:

  • 我有一个消息队列(您接收消息的来源,我们将其命名为 q111)

  • 我有多个消费者,我想用相同的消息做不同的事情。

您的问题是,当此队列接收到 3 条消息时,消息 1 由消费者 A 使用,其他消费者 B 和 C 使用消息 2 和 3。您需要设置 rabbitmq 传递相同副本的地方所有这三个消息(1,2,3)同时发送给所有三个连接的消费者(A,B,C)。

虽然可以进行许多配置来实现这一点,但一种简单的方法是使用以下两步概念:

  • 使用动态rabbitmq-shovel 从所需队列(q111)中提取消息并发布到扇出交换(专门为此目的创建和专用的交换)。
  • 现在重新配置您的消费者 A、B 和 C(他们正在收听队列(q111))以使用每个消费者的专有和匿名队列直接收听此 Fanout 交换。

注意:当使用这个概念时,不要直接从源队列(q111)消费,因为已经消费的消息不会被铲到你的 Fanout 交换中。

如果您认为这不能满足您的确切要求...请随时发表您的建议 :-)

于 2014-08-12T06:59:08.627 回答
2

我认为您应该检查使用扇出交换器发送消息。这样,您将在 RabbitMQ 表下为不同的消费者接收相同的消息,为每个新的消费者/订阅者创建不同的队列。

这是在 javascript https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html中查看教程示例的链接

于 2016-11-04T18:46:19.557 回答
1

如果您碰巧像我一样使用amqplib库,那么他们有一个方便发布/订阅 RabbitMQ 教程实现示例,您可能会发现它很方便。

于 2015-03-02T21:46:55.423 回答
1

要获得您想要的行为,只需让每个消费者从自己的队列中消费。您必须使用非直接交换类型(主题、标头、扇出)才能一次将消息发送到所有队列。

于 2012-11-01T20:11:57.867 回答
1

扇出显然是你想要的。fanout

阅读rabbitMQ教程: https ://www.rabbitmq.com/tutorials/tutorial-three-javascript.html

这是我的例子:

Publisher.js:

amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => {
    if (error0) {
      throw error0;
    }
    console.log('RabbitMQ connected')
    try {
      // Create exchange for queues
      channel = await connection.createChannel()
      await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false });
      await channel.publish(process.env.EXCHANGE_NAME, '', Buffer.from('msg'))
    } catch(error) {
      console.error(error)
    }
})

订阅者.js:

amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => {
    if (error0) {
      throw error0;
    }
    console.log('RabbitMQ connected')
    try {
      // Create/Bind a consumer queue for an exchange broker
      channel = await connection.createChannel()
      await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false });
      const queue = await channel.assertQueue('', {exclusive: true})
      channel.bindQueue(queue.queue, process.env.EXCHANGE_NAME, '')

      console.log(" [*] Waiting for messages in %s. To exit press CTRL+C");
      channel.consume('', consumeMessage, {noAck: true});
    } catch(error) {
      console.error(error)
    }
});

这是我在互联网上找到的一个例子。也许也可以提供帮助。 https://www.codota.com/code/javascript/functions/amqplib/Channel/assertExchange

于 2020-09-06T12:29:21.893 回答
1

在这种情况下,我在这里的答案中没有找到一个有趣的选项。

您可以在一个消费者中使用“重新排队”功能对消息进行 Nack,以便在另一个消费者中处理它们。一般来说,这不是正确的方法,但也许对某人来说已经足够了。

https://www.rabbitmq.com/nack.html

并当心循环(当所有消费者 nack+requeue 消息时)!

于 2018-12-13T12:26:08.767 回答