我正在研究 NodeJS 和 RabbitMQ,其中 -
- 主 NodeJS 服务(一个实例)将数据推送到队列中。
- 从属 NodeJS 服务(多个实例)使用数据并对其进行处理。
- 正在使用默认交换。
从服务在 PM2 集群模式下运行,这意味着我有 8 个从服务运行的实例。
我的期望是,当主服务开始通过队列推送数据时,从服务应该开始异步消费它们。
例如,如果主服务正在通过队列推送 10 个作业,并且每个作业需要 5 秒才能完成,那么从属服务需要 50 秒才能完成该作业。
这完全违背了使用多个奴隶的目的,因为我通常希望奴隶一次接 8 个工作。
根据 RabbitMQ 仪表板,上述设置创建 -
- 9 个连接(1 个主站 + 8 个从站)
- 9通道(1主+8从)
- 1 个队列
整个设置使用默认交换。
我的问题是——
为什么从站不能异步从队列中读取数据?
即使我设置noAck
为true
队列中的下一个项目,在处理当前项目之前不会被拾取
我的意图是通过用户多个从属实例来扩大队列消耗率,但我认为我在这里遗漏了一些东西。
这是代码库 -
const rabbitMq = require("amqplib");
class RabbitMQClient {
async connect() {
this.connection = await rabbitMq.connect("amqp://admin:password@localhost");
this.channel = await this.connection.createChannel();
await this.channel.assertQueue("TEST_QUEUE");
}
}
// MASTER CODE (This Runs In Fork Mode - 1 Instance)
const master_client = new RabbitMQClient();
master_client.connect().then(() => {
// sending in 10 messages
for (let index = 1; index <= 10; index++) {
const data = Buffer.from(index.toString(), "utf8");
master_client.channel.sendToQueue("TEST_QUEUE", data);
}
});
// SLAVE CODE (This Runs In Cluster Mode - 8 Instances)
const slave_client = new RabbitMQClient();
// connect to rabbitmq
slave_client.connect().then(() => {
// consume the messages
slave_client.channel.consume("TEST_QUEUE", (data) => {
// timeout to add delay
setTimeout(() => {
RabbitMQClient._channel.ack(data);
}, 5000);
});
});
从机输出 -
33|slave | 2020-11-02 13:19:09.293 +00:00: recieved message - 1 (13-19-09)
34|slave | 2020-11-02 13:19:14.293 +00:00: recieved message - 2 (13-19-14)
35|slave | 2020-11-02 13:19:19.299 +00:00: recieved message - 3 (13-19-19)
36|slave | 2020-11-02 13:19:24.299 +00:00: recieved message - 4 (13-19-24)
37|slave | 2020-11-02 13:19:29.300 +00:00: recieved message - 5 (13-19-29)
38|slave | 2020-11-02 13:19:34.299 +00:00: recieved message - 6 (13-19-34)
39|slave | 2020-11-02 13:19:39.301 +00:00: recieved message - 7 (13-19-39)
40|slave | 2020-11-02 13:19:44.301 +00:00: recieved message - 8 (13-19-44)
33|slave | 2020-11-02 13:19:49.299 +00:00: recieved message - 9 (13-19-49)
34|slave | 2020-11-02 13:19:54.300 +00:00: recieved message - 10 (13-19-54)
如果您注意到,不同的从站正在以循环方式接收消息,但它们是同步工作的。
谢谢!