我正在尝试在 BullMQ 上创建一个结果队列,所有工作人员都可以发送结果添加具有特殊制作的 jobID 的作业。这个想法是,所有结果都会生成一个特定的 jobID,这样我就可以确切地知道结果是针对哪个进程的。
我已经按照文档getNextJob
中的描述进行了尝试,但没有运气。
我找到的解决方案是使用queuEvents
:每个进程在结果队列的状态上注册一个侦听器waiting
,当具有它需要的 id 的作业到达时,该进程使用 a 获取作业getJob
,读取结果数据并尝试移动作业完成。它有效,我可以正确获取工人产生的结果。
我遇到的问题是将结果作业移动到已完成状态,因为我无法使用 配置锁定令牌getJob
并且我收到Missing lock for job
错误并且作业仍处于活动状态。
这是我在该过程中使用的(伪)代码
const jobID = "THE_ID_OF_THE_JOB_I_AM_WAITING_FOR";
const token = `${jobID}_results_worker`;
const queueEvents = new QueueEvents('results');
const resQueue = this.queues.get('results');
// I define a callback function to be able to remove the listener
const waitResult = async (job: {jobId: any}) => {
if (job.jobId === jobID){
debug(`Result job for ${jobID} received!`);
const resJob = await resQueue?.getJob(jobID) as Job;
queueEvents.removeListener('waiting', waitResult);
// THIS GENERATES the error
resJob?.moveToCompleted('Results received', token, false);
resolve(resJob?.data);
}
}
// Register the callback function on the queue
const listener = queueEvents?.on('waiting', waitResult );
有没有人知道如何正确处理moveToCompleted
?