我有一个应用程序,在每次消息消费时,我都需要查询 MySQL 数据库以获取一些信息,并基于该进程消费消息。我想对此进行优化,以防止数据库上的多个查询增加负载。
我正在考虑一种方法,我至少等待x 消息或y 秒。这样我可以批量消费一些消息,即使在某些时候我收到的消息较少,它们也会被消费。
示例:假设x = 100,y = 10 秒
这意味着我等待至少 100 条消息或 10 秒,以先到者为准。这样,我可以一次查询数据库中的 100 条消息。此外,如果我收到的消息少于 100 条,剩余的消息将在最多 10 秒的窗口中处理。
我正在使用 NodeJS 和amqplib
消费。我有以下基于 RabbitMQ 示例的代码:
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var q = 'hello';
ch.assertQueue(q, {durable: false});
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
ch.consume(q, function(msg) {
console.log(" [x] Received %s", msg.content.toString());
}, {noAck: true});
});
});
我正在考虑拥有一个全局对象并在每个回调中添加它,并在它到达那些被处理的x 消息consume
时检查该对象的计数。不过,不确定如何在此添加y 秒的时间上限,并确保如果我在时间窗口内收到少于x 条消息,则这些消息会得到处理