我们正在使用 mongodb-queue 进行一些处理,并且我们使用议程调度程序每 3 分钟运行一次作业,以从队列中获取消息并进行处理。我们观察到的问题是它无法按预期持续工作,有时消息会在队列中保留一段时间(甚至没有确认,意味着被拾取),然后一旦开始处理队列中的后续消息,它就会得到处理再次处理得更快,直到再次发生延迟。
如果您查看此已删除的时间戳,则顶部的最后三个事务的运行时间要晚于之前的事务,而它的处理时间应该比第 4 条记录晚 3 到 4 分钟。
在下面找到我们用来从队列中获取和处理的代码
module.exports = function (agenda_processing) {
var isStatic = false;
agenda_processing.disableRemoteMethodByName('updateAttributes', isStatic);
// agenda_processing = Object.assign(agenda_processing, httpManager);
isStatic = true;
agenda_processing.disableRemoteMethodByName('updateAll', isStatic);
agenda_processing.disableRemoteMethodByName('deleteById', isStatic);
agenda_processing.disableRemoteMethodByName('create', isStatic);
agenda_processing.disableRemoteMethodByName('upsert', isStatic);
agenda_processing.disableRemoteMethodByName('count', isStatic);
agenda_processing.disableRemoteMethodByName('findOne', isStatic);
agenda_processing.disableRemoteMethodByName('exists', isStatic);
agenda_processing.disableRemoteMethodByName('find', isStatic);
agenda_processing.disableRemoteMethodByName('findById', isStatic);
var jobsManager = ''
async function graceful() {
if (jobsManager) {
await jobsManager.stop();
}
setTimeout(() => {
process.exit(0);
}, 1000)
}
process.on('uncaughtException', graceful);
process.on("SIGTERM", graceful);
process.on("SIGINT", graceful);
//To deploy to dev commented out
setTimeout(() => {
setUpJobForProcessingQueue()
}, 3000)
function setUpJobForProcessingQueue () {
const dbUrl = config.spkmsdb.url
jobsManager = awbjobs.init(dbUrl, async () => {
await defineAndStartJobs()
})
//console.log(jobsManager)
};
async function defineAndStartJobs () {
let connector = agenda_processing.app.models.sites.getDataSource().connector.db
queue = processingqueue.initQueue(connector)
var jobNm = "processing-job"
jobsManager.define(jobNm, async function (job,done) {
try {
winstonLogger.info('Agenda: Entering the define callback')
await getDataFromQueueAndProcess(queue)
done()
winstonLogger.info('Agenda: Called done')
} catch(err) {
done(err)
}
}.bind(jobNm))
await jobsManager.every("180 seconds", jobNm) //3 minutes
await jobsManager.start()
winstonLogger.info('awb jobs have been set up')
}
}
议程详细如下:
const agenda = new Agenda({ db: { address: dbConStr ,options: { useUnifiedTopology: true, useNewUrlParser: true }}});
agenda.on('ready', onReady)
队列被内化为
queuename = 'processing-queue'
const queue = mongoDbQueue(db, queuename, {maxRetries:1, visibility:3600});
解决这种一致性的任何帮助都将非常有帮助。提前致谢。