2

我正在尝试在 BullMQ 上创建一个结果队列,所有工作人员都可以发送结果添加具有特殊制作的 jobID 的作业。这个想法是,所有结果都会生成一个特定的 jobID,这样我就可以确切地知道结果是针对哪个进程的。

我已经按照文档getNextJob中的描述进行了尝试,但没有运气。

我找到的解决方案是使用queuEvents:每个进程在结果队列的状态上注册一个侦听器waiting,当具有它需要的 id 的作业到达时,该进程使用 a 获取作业getJob,读取结果数据并尝试移动作业完成。它有效,我可以正确获取工人产生的结果。

我遇到的问题是将结果作业移动到已完成状态,因为我无法使用 配置锁定令牌getJob并且我收到Missing lock for job 错误并且作业仍处于活动状态。

这是我在该过程中使用的(伪)代码


const jobID = "THE_ID_OF_THE_JOB_I_AM_WAITING_FOR";
const token = `${jobID}_results_worker`;
const queueEvents = new QueueEvents('results');
const resQueue = this.queues.get('results');
// I define a callback function to be able to remove the listener
const waitResult = async (job: {jobId: any}) => {
  if (job.jobId === jobID){
    debug(`Result job for ${jobID} received!`);
    const resJob = await resQueue?.getJob(jobID) as Job;
    queueEvents.removeListener('waiting', waitResult);
    // THIS GENERATES the error 
    resJob?.moveToCompleted('Results received', token, false);
    resolve(resJob?.data);
  }
} 
// Register the callback function on the queue
const listener = queueEvents?.on('waiting', waitResult );

有没有人知道如何正确处理moveToCompleted

4

1 回答 1

0

您可以像这样开发一个results队列:const queue_Results = new Queue('Results');,从那里您可以让工作人员像这样处理事件const worker_Results = new Worker('Results', async (job: Job) => { // do something with the results from other jobs })

BullMQ 关于该方法的文档,在这里

于 2021-10-05T03:13:59.743 回答