1

我有一个应用程序,在每次消息消费时,我都需要查询 MySQL 数据库以获取一些信息,并基于该进程消费消息。我想对此进行优化,以防止数据库上的多个查询增加负载。

我正在考虑一种方法,我至少等待x 消息y 秒。这样我可以批量消费一些消息,即使在某些时候我收到的消息较少,它们也会被消费。

示例:假设x = 100y = 10 秒

这意味着我等待至少 100 条消息或 10 秒,以先到者为准。这样,我可以一次查询数据库中的 100 条消息。此外,如果我收到的消息少于 100 条,剩余的消息将在最多 10 秒的窗口中处理。

我正在使用 NodeJS 和amqplib消费。我有以下基于 RabbitMQ 示例的代码:

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'hello';

    ch.assertQueue(q, {durable: false});
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
    ch.consume(q, function(msg) {
      console.log(" [x] Received %s", msg.content.toString());
    }, {noAck: true});
  });
});

我正在考虑拥有一个全局对象并在每个回调中添加它,并在它到达那些被处理的x 消息consume时检查该对象的计数。不过,不确定如何在此添加y 秒的时间上限,并确保如果我在时间窗口内收到少于x 条消息,则这些消息会得到处理

4

1 回答 1

0

以下代码将在每条接收到的消息之后调用一个函数,该函数将接收到的消息聚合到一个数组中。当它在没有消息(带参数null)的情况下被调用或当它看到消息计数已达到 时x,它会将聚合消息发送到数据库函数。否则,它只是将消息添加到数组中(在if语句的第二部分)。

该参数null由一个在y几秒钟后触发的计时器传递给聚合函数。此计时器在消息队列刚刚初始化时首先设置,并在聚合器将消息发送到数据库时重置。

var messageStore = [];
var timer;

sendToDatabase = function(messages) {...}

aggregate = function(msg) {
    if (msg == null || messageStore.push(msg) == x) {
        clearTimeout(timer);
        timer = setTimeout(aggregate, 1000*y, null);
        sendToDatabase(messageStore);
        messageStore = [];
    }
}

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'hello';

    ch.assertQueue(q, {durable: false});
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
    timer = setTimeout(aggregate, 1000*y, null);
    ch.consume(q, function(msg) {
      console.log(" [x] Received %s", msg.content.toString());
      aggregate(msg);
    }, {noAck: true});
  });
});

注意:我无法对此进行测试,因为我手头没有消息传递系统。

于 2018-12-16T21:36:15.443 回答