0

我们正在使用 mongodb-queue 进行一些处理,并且我们使用议程调度程序每 3 分钟运行一次作业,以从队列中获取消息并进行处理。我们观察到的问题是它无法按预期持续工作,有时消息会在队列中保留一段时间(甚至没有确认,意味着被拾取),然后一旦开始处理队列中的后续消息,它就会得到处理再次处理得更快,直到再次发生延迟。 在此处输入图像描述

如果您查看此已删除的时间戳,则顶部的最后三个事务的运行时间要晚于之前的事务,而它的处理时间应该比第 4 条记录晚 3 到 4 分钟。

在下面找到我们用来从队列中获取和处理的代码

module.exports = function (agenda_processing) {

    var isStatic = false;
    agenda_processing.disableRemoteMethodByName('updateAttributes', isStatic);
    // agenda_processing = Object.assign(agenda_processing, httpManager);
    isStatic = true;
    agenda_processing.disableRemoteMethodByName('updateAll', isStatic);
    agenda_processing.disableRemoteMethodByName('deleteById', isStatic);
    agenda_processing.disableRemoteMethodByName('create', isStatic);
    agenda_processing.disableRemoteMethodByName('upsert', isStatic);
    agenda_processing.disableRemoteMethodByName('count', isStatic);
    agenda_processing.disableRemoteMethodByName('findOne', isStatic);
    agenda_processing.disableRemoteMethodByName('exists', isStatic);
    agenda_processing.disableRemoteMethodByName('find', isStatic);
    agenda_processing.disableRemoteMethodByName('findById', isStatic);
    
    var jobsManager = ''
    
    async function graceful() {
        if (jobsManager) {
          await  jobsManager.stop();
        }
        setTimeout(() => {
            process.exit(0);
        }, 1000)
    }
        process.on('uncaughtException', graceful);
        process.on("SIGTERM", graceful);
        process.on("SIGINT", graceful);
    
    //To deploy to dev commented out
    setTimeout(() => {

        setUpJobForProcessingQueue()
    }, 3000)

    function setUpJobForProcessingQueue () {
        const dbUrl = config.spkmsdb.url
        jobsManager = awbjobs.init(dbUrl, async () => {
            await defineAndStartJobs()
        })
        //console.log(jobsManager)
    };

    async function defineAndStartJobs () {
        let connector = agenda_processing.app.models.sites.getDataSource().connector.db
        queue = processingqueue.initQueue(connector)
        var jobNm = "processing-job"
        jobsManager.define(jobNm, async function (job,done) {   
            try {
                winstonLogger.info('Agenda: Entering the define callback')
                await getDataFromQueueAndProcess(queue)
                done()
                winstonLogger.info('Agenda: Called done')
            } catch(err) {
                done(err)
            }
        }.bind(jobNm))
        await jobsManager.every("180 seconds", jobNm) //3 minutes
        await jobsManager.start()
        winstonLogger.info('awb jobs have been set up')
    }
}

议程详细如下:

const agenda = new Agenda({ db: { address: dbConStr ,options: { useUnifiedTopology: true, useNewUrlParser: true }}});
          agenda.on('ready', onReady)

队列被内化为

queuename =  'processing-queue'
const queue = mongoDbQueue(db, queuename, {maxRetries:1, visibility:3600});

解决这种一致性的任何帮助都将非常有帮助。提前致谢。

4

0 回答 0