3

我正在尝试process在不同的processor文件 itef 中运行它,因为它在bull 文档中,我添加了如下文件。

// -------- Queue.js ----------

formatQueue.process(__dirname + "/processors/format-worker.js");


// On Cmplete Handler

formatQueue.on('completed', function(job, result){
    console.log(result, "Format-Complete-job"); // result is undefined
    if(job.data.is_well_format){
        existenceQueue.add(job.data, { attempts: 3, backoff: 1000 });
    }else QueueModel.lastStep(job.data)
});

// -------- Queue.js ends ---------

//format-worker.js
Validator = require("../../validators");
module.exports = (job) => {
    Validator.Format.validate(job.data, (data) => {
        job.data = data;
        return Promise.resolve(data);
    });
}

现在在我之前使用的作业完成时,我曾经使用更新的作业参数获取作业数据。现在我没有得到更新的工作数据。文档中的第二个参数resultundefined. 现在在这种情况下如何获取更新的作业数据。

和工作正常,我运行job如下。processorsprocess

formatQueue.process(function(job, done){
    Validator.Format.validate(job.data, (data) => {
        job.data = data;
        done();
    });
});

在这种情况下,作业数据本身正在更新,并且效果也很好。

我有多个队列在做不同的工作,一旦前一个工作成功完成,那么只有我需要另一个工作来开始工作。我还有另一个问题,我已经提到了那里的用例。请在这里查看

4

1 回答 1

1

让我帮你,也给你一些提示。首先,您的代码中有一些错误。process 函数没有返回您在 validate 回调中创建的承诺。现在,我不知道Validator.Format.validate返回什么,所以为了安全起见,我会这样写:

module.exports = job => {
  return new Promise(resolve => {
    Validator.Format.validate(job.data, data => {
      resolve(data);
    });
  });
};

其次,一般来说,在流程处理程序本身而不是在“完成”事件回调中添加下一个作业更健壮,原因是这样做您将获得一个更健壮的解决方案,在添加下一个的情况下作业由于某种原因失败,作业将失败,您将能够重试或检查失败的原因等等。

module.exports = (job) => {
  return new Promise(resolve => {
    Validator.Format.validate(job.data, (data) => {
      if(data.is_well_format){
        resolve(existenceQueue.add(data, { attempts: 3, backoff: 1000 }));
      }else {
        resolve(QueueModel.lastStep(data))
      }
    });
  }
});
于 2021-07-16T12:25:04.160 回答