1

我正在构建一个同时在线拥有一百万用户的分析系统。我使用消息代理之类的 RabbitMQ 来减少服务器的容量

这是我的图表

在此处输入图像描述

我的系统包括 3 个组件。

发布服务器:(生产者) 这个系统是建立在 nodejs 上的。该系统的目的是将消息发布到queue

RabbitMQ 队列:该系统存储publisher server发送到的消息。之后,打开一个连接以从队列发送消息subscriber server

订阅服务器(消费者):该系统接收来自queue

发布服务器源代码

var amqp = require('amqplib/callback_api');
amqp.connect("amqp://localhost", function(error, connect) {
    if (error) {
        return callback(-1, null);
    } else {
       connect.createChannel(function(error, channel) {
       if (error) {
           return callback(-3, null);
       } else {
         var q = 'logs';
         var msg = data; // object
         // convert msg object to buffer 
         var new_msg = Buffer.from(JSON.stringify(msg), 'binary');

        channel.assertExchange(q, 'fanout', { durable: false });
        channel.publish(q, 'message_queues', new Buffer(new_msg));
       console.log(" [x] Sent %s", new_msg);
        return callback(null, msg);
      }
    });
   }
}); 

专门创建交换"message_queues""fanout"向所有消费者发送广播

订阅服务器源代码

var amqp = require('amqplib/callback_api');
amqp.connect("amqp://localhost", function(error, connect) {
    if (error) {
        console.log('111');
    } else {
        connect.createChannel(function(error, channel) {
            if (error) {
                console.log('1');
            } else {
                var ex = 'logs';

                channel.assertExchange(ex, 'fanout', { durable: false });
                channel.assertQueue('message_queues', { exclusive: true }, function(err, q) {
                    if (err) {
                        console.log('123');
                    } else {
                        console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
                        channel.bindQueue(q.queue, ex, 'message_queues');

                        channel.consume(q.queue, function(msg) {
                            console.log(" [x] %s", msg.content.toString());

                        }, { noAck: true });

                    }
                });
            }
        });
    }

});

"message_queues"交易所接收消息

当我实现发送消息时。系统运行良好,但是我尝试了该系统的基准测试性能(每秒大约 1000 个用户发送请求),然后系统出现了一些问题。系统似乎过载/缓冲区溢出(或某些事情无法正常工作)。

两天前我才读到rabbitmq。我知道它的教程是基本示例,所以我需要帮助在现实世界中构建系统而不是......任何解决方案和建议

希望我的问题有意义

4

1 回答 1

3

你的问题很笼统。可能您应该提供更多详细信息以帮助识别瓶颈并帮助您。所以,首先我认为你应该检查rabbit mq - 它是否是瓶颈。有很多事情可能出错:

  1. 可以消费消息的消费者数量太少(我假设你使用一个消费者池)

  2. 网络太慢

  3. 队列和消息在 Rabbit MQ 的太多节点之间复制并执行磁盘(可以像这样使用 rabbit mq)

  4. 消费者无法真正处理消息,并且不断重新排队

因此,通常在您的测试期间,您应该检查 rabbit mq 并查看那里发生了什么。

消息一旦到达队列就处于就绪状态,一旦发生这种情况,它将一直存在,直到连接到队列的消费者之一不会尝试获取消息进行处理

当其中一个消费者(兔子在他们之间进行循环)选择要处理的消息时,如果消费者未能处理该消息,它的状态将变为Unacknowledged ,它将由兔子重新排队,以便另一个消费者有机会处理消息。

当然,如果消费者成功处理了消息,消息就会从rabbit mq服务器上消失。

假设您已经安装了 rabbit mq web ui(我强烈推荐它,尤其是对于初学者)——您可以直观地看到队列中发生了什么——您将看到有多少消息处于就绪状态,以及有多少消息未确认。这将有助于识别瓶颈。

例如 - 如果您看到通常只有一条消息处于未确认状态,这可能意味着消费者无法处理该消息并将其发送回 rabbit。另一方面,新消息总是从生产者到达,因此就绪消息的数量会非常快地增加。这也可以指出您只使用一个消费者,一次只能处理一条消息。所以你可以在这里考虑并行化,通过在不同的线程中运行许多消费者甚至集群你的应用程序(在兔子消费者可以驻留在不同的机器上)

当然,希望这总体上有所帮助,正如我之前所说,如果您有更具体的问题 - 请提供有关测试期间究竟发生了什么的更多信息

于 2016-12-28T09:19:23.197 回答