0

我正在开发 NodeJS 应用程序,在该应用程序中,我使用 Redis 的 kue 模块来管理队列以执行任务。

出现“超出 TTL”错误。由于这个错误,整个 redis 任务队列已经被 stueyeck 并且它可能不会自动启动队列以及不允许执行队列中的其他待处理任务。

根据 Kue 文档:

作业生产者可以为其作业可以处于活动状态的时间设置一个到期值,这样如果工作人员没有及时回复,Kue 将失败并显示 TTL 超出错误消息,防止该作业卡在活动状态并破坏并发。

我不知道如何处理这种情况。我发布了我的代码,我需要帮助来解决问题。

"use strict";
const redisConst = require('../../constants/' + process.env.NODE_ENV + '.json').redis;
const DataConsolidationController = require('../../api/data-consolidation/controller/data-consolidation-controller');
const ContactController = require('../../api/contact/controller/contact-controller');
const GmailController = require('../../api/email/gmail/gmail-controller');
const FileUploadController = require('../../api/file-upload/controller/file-upload-controller');
var fs = require('fs');
var kue = require('kue');
/*
* If you have a huge concurrency in uncompleted jobs,
* turn this feature off and use queue level events for better memory scaling.
*/
var queue = kue.createQueue({
    prefix: 'qt',
    redis: redisConst,
    jobEvents: false,
    removeOnComplete: true
});
var job;
var _io;
var concurrency = 5;

/* queue setting */
queue.on('ready', () => {
    console.info('Queue is ready!');
});

// A job get executed
queue.on('job enqueue', (id, type) => {
    console.log('Job %s got queued', id);
});

// A job get removed
queue.on('job complete', (id, result) => {
    kue.Job.get(id, (err, job) => {
        if (err)
            return;
        job.remove((err) => {
            if (err)
                throw err;
            console.log('Removed completed job #%d', job.id);
        });
    });
});

queue.on("job process", (id, result) => {
    kue.Job.get(id, (err, job) => {
        if (err)
            return;
        console.log("job process is done", job.id);
    });
})

queue.on('error', (err) => {
    // handle connection errors here
    console.error('There was an error in the main queue!');
    console.error(err);
    console.error(err.stack);
});

queue.watchStuckJobs();

process.once('SIGTERM', (sig) => {
    queue.shutdown(5000, (err) => {
        console.log('Kue shutdown: ', err || '');
        process.exit(0);
    });
});


/* workers */
queue.process('import', concurrency, (job, done) => {
    switch (job.data.type) {
        // File upload import
        case 'file-upload-import':
            FileUploadController.csvUploadWithQueueTechifyNew(_io.sockets.connected[job.data.socketId], job.data.filePath, job.data.userId)
                //FileUploadController.csvUploadWithQueue(_io.sockets.connected[job.data.socketId], job.data.filePath, job.data.userId)
                .then(result => {
                    fs.unlinkSync(job.data.filePath);
                    done();
                })
                .then(result => {
                    ContactController.recommendationEngine(null, job.data.userId);
                    done();
                })
                .catch(err => {
                    console.log(err);
                    done(err);
                });
            break;
    }
});

function findJobCount() {
    queue.activeCount((err, count) => {
        if (!err)
            console.log('**** Active: ', count);
    });
    queue.inactiveCount((err, count) => {
        if (!err)
            console.log('**** Inactive:', count);
    });
}

module.exports = class QueueController {
    static init(io) {
        _io = io;
    }

    /* producers */
    static createJob(name, data) {
        if (data.type === 'import-salesforce-data') {
            job = queue.create(name, data)
                .delay(1000)
                .ttl(600000)
                .attempts(1)
                .backoff(true)
                .removeOnComplete(true)
                .save((err) => {
                    if (err) {
                        console.error(err);
                        done(err);
                    } else if (!err) {
                        done();
                    }
                });
        } else {
            job = queue.create(name, data)
                .delay(1000)
                .ttl(120000)
                .attempts(1)
                .backoff(true)
                .removeOnComplete(true);
        }

        job
            .on('start', () => {
                console.log('Job', job.id, 'is now running');
                findJobCount();
            })
            .on('complete', () => {
                console.log('Job', job.id, 'is done');
                findJobCount();
            })
            .on('failed', () => {
                console.log('Job', job.id, 'has failed');
                job.remove();
                findJobCount();
            })
            .on("progress", () => {
                console.log("job", job.id, "is progressing");
            });

        job.save((err, result) => {
            if (err) {
                console.log('Error in adding Job: ' + err);
            } else {
                console.log("Job saved");
            }
        });
    }
}
4

1 回答 1

1

超出 TTL 意味着您的工作未在 TTL 内完成。请检查为什么您的工作没有在 TTL 内完成
根据我对您的代码的理解 - 在 queue.process 中,所有 data.type 都未处理,因此永远不会调用 done 并且作业长时间保持活动状态并最终 ttl 过期。

此外,您的 ttl 为 5 分钟和 10 分钟,因此每个工作保持活动状态 5 或 10 分钟,因此不会长时间给其他工作机会。尽可能减少 TTL。

由于您有 5 个并发意味着您的所有 5 个并发作业都处于活动和卡住状态,因此无法安排其他作业。如果可能,增加并发性。

您还可以使用 kue-ui-express 获取您的工作及其状态的 GUI 表示。

而且,如果 kue 中有许多作业等待执行,kue 可能会溢出一些作业。

于 2017-10-17T19:43:53.947 回答