我有一个使用 RabbitMQ 和 NodeJS 进行图像处理的进程。由于任务繁重,我想我和这里的链接有同样的问题https://github.com/squaremo/amqp.node/issues/261
我试图弄清楚如何实施对该问题的最后评论。
“是的。NodeJS 是单线程的,如果您长时间使用该线程执行某项操作,则不会发生其他任何事情。正如@michaelklishin 所建议的那样,此一般问题的已知解决方案是使用子进程或集群模块。”
编辑:
我用我认为如何使用 amqp-connection-manager 模块执行此操作的示例更新了下面的代码。现在我使用一个全局变量来保存能够确认的实际消息。我猜有更好的方法来做到这一点。
//Used to be an example for how to keep the connection thread and the working thread separate
//in order to fix the issue of missing heartbeat intervals due to processing on the same thread
const cluster = require('cluster');
var amqp = require('amqp-connection-manager');
var config = require('./config.json');
var working_queue = "Test_Queue";
//DONT REALLY DO THIS
var rabbit_msg_data;
//******* CLUSTER SETUP *******
// This will spawn off the number of worker for this process.
if (cluster.isMaster) {
console.log("Master "+process.pid+" is running");
worker = cluster.fork();
cluster.on('exit', (worker, code, signal) => {
if(signal)
{
console.log("worker was killed by signal: "+signal);
console.log(worker);
}
else if (code !== 0)
{
console.log("worker exited with error code: "+code);
console.log(worker);
}
else
{
console.log("Worker "+worker.process.pid+" exited successfully");
console.log(worker);
//Not sure if this works this way or if I need to put this worker into variables
}
});
//testing sending a message back and forth
// setTimeout(function() {
// worker.send("I got a request!");
// }, 1000);
//******** RABBIT MQ CONNECTION **********
// Create a connection manager to rabbitmq
var connection = amqp.connect(config.rabbit_connections_arr, {json: true, heartbeatIntervalInSeconds: 2});
connection.on('connect', function() {
console.log('Connected to rabbitmq');
});
connection.on('disconnect', function(params) {
console.log('Disconnected from rabbitmq:', params.err.stack);
});
// Set up a channel listening for messages in the queue.
var channelWrapper_listening = connection.createChannel({
setup: function(channel) {
// `channel` here is a regular amqplib `ConfirmChannel`.
return Promise.all([
channel.assertQueue(working_queue, {durable: true}),
channel.prefetch(1),
channel.consume(working_queue, function(data){
rabbit_msg_data = data;
worker.send(data.content.toString());
}, requeue = false)
]);
}
});
worker.on('message', function(msg){
// console.log("Worker to Master (ack): ", msg.content.toString());
console.log("Worker to Master (ack): ", msg);
//console.log(msg.content.toString());
channelWrapper_listening.ack(rabbit_msg_data);
});
}
else //All worker processes (MAIN LOGIC)
{
console.log("Worker "+process.pid+" started");
process.on('message',function(msg){
console.log("Master to Worker (working): ", msg);
//send msg back when done working on it.
setTimeout(function() {
process.send(msg);
}, 5000);
});
}