我正在构建一个同时在线拥有一百万用户的分析系统。我使用消息代理之类的 RabbitMQ 来减少服务器的容量
这是我的图表
我的系统包括 3 个组件。
发布服务器:(生产者)
这个系统是建立在 nodejs 上的。该系统的目的是将消息发布到queue
RabbitMQ 队列:该系统存储publisher server
发送到的消息。之后,打开一个连接以从队列发送消息subscriber server
。
订阅服务器(消费者):该系统接收来自queue
发布服务器源代码
var amqp = require('amqplib/callback_api');
amqp.connect("amqp://localhost", function(error, connect) {
if (error) {
return callback(-1, null);
} else {
connect.createChannel(function(error, channel) {
if (error) {
return callback(-3, null);
} else {
var q = 'logs';
var msg = data; // object
// convert msg object to buffer
var new_msg = Buffer.from(JSON.stringify(msg), 'binary');
channel.assertExchange(q, 'fanout', { durable: false });
channel.publish(q, 'message_queues', new Buffer(new_msg));
console.log(" [x] Sent %s", new_msg);
return callback(null, msg);
}
});
}
});
专门创建交换
"message_queues"
以"fanout"
向所有消费者发送广播
订阅服务器源代码
var amqp = require('amqplib/callback_api');
amqp.connect("amqp://localhost", function(error, connect) {
if (error) {
console.log('111');
} else {
connect.createChannel(function(error, channel) {
if (error) {
console.log('1');
} else {
var ex = 'logs';
channel.assertExchange(ex, 'fanout', { durable: false });
channel.assertQueue('message_queues', { exclusive: true }, function(err, q) {
if (err) {
console.log('123');
} else {
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
channel.bindQueue(q.queue, ex, 'message_queues');
channel.consume(q.queue, function(msg) {
console.log(" [x] %s", msg.content.toString());
}, { noAck: true });
}
});
}
});
}
});
从
"message_queues"
交易所接收消息
当我实现发送消息时。系统运行良好,但是我尝试了该系统的基准测试性能(每秒大约 1000 个用户发送请求),然后系统出现了一些问题。系统似乎过载/缓冲区溢出(或某些事情无法正常工作)。
两天前我才读到rabbitmq。我知道它的教程是基本示例,所以我需要帮助在现实世界中构建系统而不是......任何解决方案和建议
希望我的问题有意义