我正在开发 NodeJS 应用程序,在该应用程序中,我使用 Redis 的 kue 模块来管理队列以执行任务。
出现“超出 TTL”错误。由于这个错误,整个 redis 任务队列已经被 stueyeck 并且它可能不会自动启动队列以及不允许执行队列中的其他待处理任务。
根据 Kue 文档:
作业生产者可以为其作业可以处于活动状态的时间设置一个到期值,这样如果工作人员没有及时回复,Kue 将失败并显示 TTL 超出错误消息,防止该作业卡在活动状态并破坏并发。
我不知道如何处理这种情况。我发布了我的代码,我需要帮助来解决问题。
"use strict";
const redisConst = require('../../constants/' + process.env.NODE_ENV + '.json').redis;
const DataConsolidationController = require('../../api/data-consolidation/controller/data-consolidation-controller');
const ContactController = require('../../api/contact/controller/contact-controller');
const GmailController = require('../../api/email/gmail/gmail-controller');
const FileUploadController = require('../../api/file-upload/controller/file-upload-controller');
var fs = require('fs');
var kue = require('kue');
/*
* If you have a huge concurrency in uncompleted jobs,
* turn this feature off and use queue level events for better memory scaling.
*/
var queue = kue.createQueue({
prefix: 'qt',
redis: redisConst,
jobEvents: false,
removeOnComplete: true
});
var job;
var _io;
var concurrency = 5;
/* queue setting */
queue.on('ready', () => {
console.info('Queue is ready!');
});
// A job get executed
queue.on('job enqueue', (id, type) => {
console.log('Job %s got queued', id);
});
// A job get removed
queue.on('job complete', (id, result) => {
kue.Job.get(id, (err, job) => {
if (err)
return;
job.remove((err) => {
if (err)
throw err;
console.log('Removed completed job #%d', job.id);
});
});
});
queue.on("job process", (id, result) => {
kue.Job.get(id, (err, job) => {
if (err)
return;
console.log("job process is done", job.id);
});
})
queue.on('error', (err) => {
// handle connection errors here
console.error('There was an error in the main queue!');
console.error(err);
console.error(err.stack);
});
queue.watchStuckJobs();
process.once('SIGTERM', (sig) => {
queue.shutdown(5000, (err) => {
console.log('Kue shutdown: ', err || '');
process.exit(0);
});
});
/* workers */
queue.process('import', concurrency, (job, done) => {
switch (job.data.type) {
// File upload import
case 'file-upload-import':
FileUploadController.csvUploadWithQueueTechifyNew(_io.sockets.connected[job.data.socketId], job.data.filePath, job.data.userId)
//FileUploadController.csvUploadWithQueue(_io.sockets.connected[job.data.socketId], job.data.filePath, job.data.userId)
.then(result => {
fs.unlinkSync(job.data.filePath);
done();
})
.then(result => {
ContactController.recommendationEngine(null, job.data.userId);
done();
})
.catch(err => {
console.log(err);
done(err);
});
break;
}
});
function findJobCount() {
queue.activeCount((err, count) => {
if (!err)
console.log('**** Active: ', count);
});
queue.inactiveCount((err, count) => {
if (!err)
console.log('**** Inactive:', count);
});
}
module.exports = class QueueController {
static init(io) {
_io = io;
}
/* producers */
static createJob(name, data) {
if (data.type === 'import-salesforce-data') {
job = queue.create(name, data)
.delay(1000)
.ttl(600000)
.attempts(1)
.backoff(true)
.removeOnComplete(true)
.save((err) => {
if (err) {
console.error(err);
done(err);
} else if (!err) {
done();
}
});
} else {
job = queue.create(name, data)
.delay(1000)
.ttl(120000)
.attempts(1)
.backoff(true)
.removeOnComplete(true);
}
job
.on('start', () => {
console.log('Job', job.id, 'is now running');
findJobCount();
})
.on('complete', () => {
console.log('Job', job.id, 'is done');
findJobCount();
})
.on('failed', () => {
console.log('Job', job.id, 'has failed');
job.remove();
findJobCount();
})
.on("progress", () => {
console.log("job", job.id, "is progressing");
});
job.save((err, result) => {
if (err) {
console.log('Error in adding Job: ' + err);
} else {
console.log("Job saved");
}
});
}
}