0

我正在研究 NodeJS 和 RabbitMQ,其中 -

  • 主 NodeJS 服务(一个实例)将数据推送到队列中。
  • 从属 NodeJS 服务(多个实例)使用数据并对其进行处理。
  • 正在使用默认交换。

从服务在 PM2 集群模式下运行,这意味着我有 8 个从服务运行的实例。

我的期望是,当主服务开始通过队列推送数据时,从服务应该开始异步消费它们。

例如,如果主服务正在通过队列推送 10 个作业,并且每个作业需要 5 秒才能完成,那么从属服务需要 50 秒才能完成该作业。

这完全违背了使用多个奴隶的目的,因为我通常希望奴隶一次接 8 个工作。

根据 RabbitMQ 仪表板,上述设置创建 -

  • 9 个连接(1 个主站 + 8 个从站)
  • 9通道(1主+8从)
  • 1 个队列

整个设置使用默认交换。

我的问题是——

为什么从站不能异步从队列中读取数据?

即使我设置noAcktrue队列中的下一个项目,在处理当前项目之前不会被拾取

我的意图是通过用户多个从属实例来扩大队列消耗率,但我认为我在这里遗漏了一些东西。

这是代码库 -

const rabbitMq = require("amqplib");

class RabbitMQClient {

    async connect() {
        this.connection = await rabbitMq.connect("amqp://admin:password@localhost");
        this.channel = await this.connection.createChannel();
        await this.channel.assertQueue("TEST_QUEUE");
    }
}


// MASTER CODE (This Runs In Fork Mode - 1 Instance)
const master_client = new RabbitMQClient();
master_client.connect().then(() => {
    // sending in 10 messages
    for (let index = 1; index <= 10; index++) {
        const data = Buffer.from(index.toString(), "utf8");
        master_client.channel.sendToQueue("TEST_QUEUE", data);
    }
});


// SLAVE CODE (This Runs In Cluster Mode - 8 Instances)
const slave_client = new RabbitMQClient();
// connect to rabbitmq
slave_client.connect().then(() => {
    // consume the messages
    slave_client.channel.consume("TEST_QUEUE", (data) => {
        // timeout to add delay
        setTimeout(() => {
            RabbitMQClient._channel.ack(data);
        }, 5000);
    });
});

从机输出 -

33|slave | 2020-11-02 13:19:09.293 +00:00: recieved message - 1  (13-19-09)
34|slave | 2020-11-02 13:19:14.293 +00:00: recieved message - 2  (13-19-14)
35|slave | 2020-11-02 13:19:19.299 +00:00: recieved message - 3  (13-19-19)
36|slave | 2020-11-02 13:19:24.299 +00:00: recieved message - 4  (13-19-24)
37|slave | 2020-11-02 13:19:29.300 +00:00: recieved message - 5  (13-19-29)
38|slave | 2020-11-02 13:19:34.299 +00:00: recieved message - 6  (13-19-34)
39|slave | 2020-11-02 13:19:39.301 +00:00: recieved message - 7  (13-19-39)
40|slave | 2020-11-02 13:19:44.301 +00:00: recieved message - 8  (13-19-44)
33|slave | 2020-11-02 13:19:49.299 +00:00: recieved message - 9  (13-19-49)
34|slave | 2020-11-02 13:19:54.300 +00:00: recieved message - 10 (13-19-54)

如果您注意到,不同的从站正在以循环方式接收消息,但它们是同步工作的。

谢谢!

4

1 回答 1

0

RabbitMQ 中的队列是单线程的,不能同时并行调度消息。但这并不意味着消费者没有同时进行处理。当一个消费者正在消费时,绑定到同一个队列的其他消费者可以同时收到消息并开始消费。

如何提高吞吐量

一般来说,以下提示总是有帮助的:

  • 将您的队列拆分到不同的核心上。你可以使用RabbitMQ Sharding插件,它可以自动对队列进行分片,也可以手动进行
  • 为每个消费者设置一个最优的预取计数,可以降低消费者获取消息的网络成本
  • 每个消费者 1 个频道。和队列一样,通道也是单线程的

有关更多详细信息,您可以访问这里

于 2020-11-04T04:58:26.693 回答