我的项目中有多个公牛队列,NodeJS
如果前一个队列成功执行,它们将运行。我正在尝试在这里验证一些电子邮件地址。
检查电子邮件格式(formatQueue)
使用包的电子邮件存在
npm email-existence
(existenceQueue)
该formatQueue
过程花费的时间更少,它将运行RegEx
并验证Email
格式。但该email-existence
包大约需要 5-10 秒才能完成。
formatQueue
existenceQueue
如果少于jobs
20-100 ,则可以正常工作。但是当我一次添加超过 1000 个工作时,existenceQueue
失败并出现以下错误
myemail@email.com job stalled more than allowable limit
我检查了HERE和HERE的问题,我认为该过程需要很长时间才能响应,所以limiter
按照HERE的引用添加。但这对我没有帮助。
如果任何队列中的作业失败,则不会处理下一个作业。它将停在那里,其他工作将保持waiting
状态。
我的代码类似于下面的代码。请帮我解决这个问题。
队列.js
var formatQueue = new Queue('format', "redis-db-url");
var existenceQueue = new Queue('existence', "redis-db-url");
// ------------ function for adding to queue ------------
module.exports.addToQueue = (emails) => {
emails.forEach(element => {
formatQueue.add(element, { attempts: 3, backoff: 1000 });
});
}
// ------------ Queue Process -------------
// Format Test Process
formatQueue.process(function(job, done){
FormatTest.validate(job.data, (err, data) => {
if(err) done();
else{
job.data = data;
done();
}
});
});
// Existence Test Process
formatQueue.process(function(job, done){
ExistenceTest.validate(job.data, (err, data) => {
if(err) done();
else{
job.data = data;
done();
}
});
});
// ------------ On Cmplete Handlers ------------
formatQueue.on('completed', function(job){
if(job.data.is_well_format){
existenceQueue.add(job.data, { attempts: 3, backoff: 1000 });
}else QueueModel.lastStep(job.data)
});
existenceQueue.on('completed', function(job){
QueueModel.lastStep(job.data)
});
// ------------ To update the emaile ------------
module.exports.lastStep = (data) => {
Emails.updateEmail(data, (err, updated) => {
if(!err) {
formatQueue.clean('completed');
existenceQueue.clean('completed');
}
})
}
- - - - - 更新 - - - - -
处理器花费了太多时间来响应,因此stalled
由于我使用超时,因此工作正在获得或失败。
我正在尝试process
在不同的processor
文件中运行 itef 作为其在bull
文档中,我添加了如下文件。
// -------- Queue.js ----------
formatQueue.process(__dirname+"/processors/format-worker.js");
// On Cmplete Handler
formatQueue.on('completed', function(job, result){
console.log(result, "Format-Complete-job"); // result is undefined
if(job.data.is_well_format){
existenceQueue.add(job.data, { attempts: 3, backoff: 1000 });
}else QueueModel.lastStep(job.data)
});
// -------- Queue.js ends ---------
//format-worker.js
Validator = require("../../validators");
module.exports = (job) => {
Validator.Format.validate(job.data, (data) => {
job.data = data;
return Promise.resolve(data);
});
}
现在在我之前使用的作业完成时,我曾经使用更新的作业参数获取作业数据。现在我没有得到更新的工作数据。文档中的第二个参数result
是undefined
. 现在在这种情况下如何获取更新的作业数据。