1

我的项目中有多个公牛队列,NodeJS如果前一个队列成功执行,它们将运行。我正在尝试在这里验证一些电子邮件地址。

  1. 检查电子邮件格式(formatQueue)

  2. 使用包的电子邮件存在npm email-existence(existenceQueue)

formatQueue过程花费的时间更少,它将运行RegEx并验证Email格式。但该email-existence包大约需要 5-10 秒才能完成。

formatQueueexistenceQueue如果少于jobs20-100 ,则可以正常工作。但是当我一次添加超过 1000 个工作时,existenceQueue失败并出现以下错误

myemail@email.com job stalled more than allowable limit

我检查了HEREHERE的问题,我认为该过程需要很长时间才能响应,所以limiter按照HERE的引用添加。但这对我没有帮助。

如果任何队列中的作业失败,则不会处理下一个作业。它将停在那里,其他工作将保持waiting状态。

我的代码类似于下面的代码。请帮我解决这个问题。

队列.js

var formatQueue = new Queue('format', "redis-db-url");
var existenceQueue = new Queue('existence', "redis-db-url");

// ------------ function for adding to queue ------------
module.exports.addToQueue = (emails) => {
    emails.forEach(element => {
        formatQueue.add(element, { attempts: 3, backoff: 1000 });
    });
}

// ------------ Queue Process -------------

// Format Test Process
formatQueue.process(function(job, done){
    FormatTest.validate(job.data, (err, data) => {
        if(err) done();
        else{
            job.data = data;
            done();
        }
    });
});

// Existence Test Process
formatQueue.process(function(job, done){
    ExistenceTest.validate(job.data, (err, data) => {
        if(err) done();
        else{
            job.data = data;
            done();
        }
    });
});


// ------------ On Cmplete Handlers ------------
formatQueue.on('completed', function(job){
    if(job.data.is_well_format){
        existenceQueue.add(job.data, { attempts: 3, backoff: 1000 });
    }else QueueModel.lastStep(job.data)
});

existenceQueue.on('completed', function(job){
    QueueModel.lastStep(job.data)
});


// ------------ To update the emaile ------------
module.exports.lastStep = (data) => {
    Emails.updateEmail(data, (err, updated) => {
        if(!err) {
            formatQueue.clean('completed');
            existenceQueue.clean('completed');
        }
    })
}

- - - - - 更新 - - - - -

处理器花费了太多时间来响应,因此stalled由于我使用超时,因此工作正在获得或失败。

我正在尝试process在不同的processor文件中运行 itef 作为其在bull文档中,我添加了如下文件。

// -------- Queue.js ----------

formatQueue.process(__dirname+"/processors/format-worker.js");


// On Cmplete Handler

formatQueue.on('completed', function(job, result){
    console.log(result, "Format-Complete-job"); // result is undefined
    if(job.data.is_well_format){
        existenceQueue.add(job.data, { attempts: 3, backoff: 1000 });
    }else QueueModel.lastStep(job.data)
});

// -------- Queue.js ends ---------

//format-worker.js
Validator = require("../../validators");
module.exports = (job) => {
    Validator.Format.validate(job.data, (data) => {
        job.data = data;
        return Promise.resolve(data);
    });
}

现在在我之前使用的作业完成时,我曾经使用更新的作业参数获取作业数据。现在我没有得到更新的工作数据。文档中的第二个参数resultundefined. 现在在这种情况下如何获取更新的作业数据。

4

1 回答 1

2

尝试可重复的工作

var formatQueue = new Queue('format', "redis-db-url");
var existenceQueue = new Queue('existence', "redis-db-url");

// ------------ function for adding to queue ------------
module.exports.addToQueue = (emails) => {
  emails.forEach(element => {
    let jobOptions = {
      repeat: {
        every: 10 * 1000, // Run job every 10 seconds for example
        limit: 3 // Maximum number of times a job can repeat.
      },
      jobId: someUniqueId, // important do not forget this
      removeOnComplete: true, // removes job from queue on success (if required)
      removeOnFail: true // removes job from queue on failure (if required)
    }

    formatQueue.add(element, jobOptions);
  });
}

// ------------ Queue Process -------------

// Format Test Process
formatQueue.process(function (job, done) {
  FormatTest.validate(job.data, (err, data) => {
    if (err) {
      // Done with error
      done(true);
    } else {
      job.data = data;
      // Done without any error
      done(false);
    }
  });
});

// Existence Test Process
existenceQueue.process(function (job, done) {
  ExistenceTest.validate(job.data, (err, data) => {
    if (err) {
      // Done with error
      done(true);
    } else {
      job.data = data;
      // Done without any error
      done(false);
    }
  });
});


// ------------ On Complete Handlers ------------
formatQueue.on('completed', function (job) {
  if (job.data.is_well_format) {
    let jobOptions = {
      repeat: {
        every: 10 * 1000, // Run job every 10 seconds for example
        limit: 3 // Maximum number of times a job can repeat.
      },
      jobId: someUniqueId, // important do not forget this
      removeOnComplete: true, // removes job from queue on success (if required)
      removeOnFail: true // removes job from queue on failure (if required)
    }

    existenceQueue.add(job.data, jobOptions);
  } else QueueModel.lastStep(job.data)
});

existenceQueue.on('completed', function (job) {
  QueueModel.lastStep(job.data)
});


// ------------ To update the email ------------
module.exports.lastStep = (data) => {
  Emails.updateEmail(data, (err, updated) => {
    if (!err) {
      formatQueue.clean('completed');
      existenceQueue.clean('completed');
    }
  })
}

于 2020-05-06T14:45:52.257 回答