0

当我在不同的函数上调用 getJob 时,我需要一种方法来阻止工作人员处理工作。我环顾四周,但找不到解决方案。

我有以下设置。在带有 express 的 nodeJS 中,我有工作节点。

  1. 以延迟状态创建的作业。
  2. 正在以不同的功能访问作业
async function jobReader(id) {
   const job = await queue.getJob(id);
   /* do some stuff */
   await job.remove();
}
  1. 独立处理作业的工作节点。只有在延迟时间结束时才会处理作业。
queue.process(async (job) => {
   /* do some stuff */
})

queue.getJob(id)不会阻止工人处理工作。因此,处理工作的工人和处理工作的 jobReader 之间存在竞争。我正在根据工作状态将一些结果写入数据库。所以比赛条件是不可接受的。

显然, getJob 并没有阻止工作人员处理工作。如果作业是由具有 getJob 函数的其他函数读取的,是否有任何方法可以锁定或阻止工作人员的工作。

任何帮助或文档将不胜感激。

谢谢

4

1 回答 1

0

我想你应该稍微改变一下你的架构。WorkerNode完全按照它的意图去做,它接受jobs并运行它们。queue因此,您不应以某种方式阻止 ,而应仅在用户批准/取消/失败(或 120 秒后未发送响应)时job添加。queue

如果我理解正确,这应该让您了解如何控制不同请求之间的作业:

// this is YOUR queueu object. I don't now implentation but think 
// of it like this..
const queue = new Queue()

// a variable holding the pending jobs which are not timeouted
// or explicitly approved/canceled/failed by user
const waitingJobs = {

}


// This could be your location where the user calls the api for creating a job.
app.post('/job', (req, res) => {

    // create the job as the user requested it
    const job = createJob(req)

    // Add a timeout for 120 seconds into your waitingJobs array. 
    // So if the user does not respond after that time, the job will 
    // be added to queue! .
    const timeout = setTimeout(() => {

        queue.add(job)

        // remove the reference after adding, garbage collection..
        waitingJobs[job.id] = null

    // job is added to queue automatically after 120 seconds
    }, 120 * 1000)

    // store the timeout in the job object!
    job.timeout = timeout

    // store the waiting job!
    waitingJobs[job.id] = job

    // respond to user, send back id so client can do another 
    // request if wanted.
    req.status(200).json({ message: 'Job created!', id: job.id })
})


app.post('/job/:id', (req, res) => {

    const id = req.params.id

    if (!id) {
        req.status(400).json('bad job id provided')
        return
    }

    // get the queued job:
    const job = waitingJobs[id]

    if (!job) {
        req.status(400).json('Job nod found OR job already processed. Job id: ' + id)
        return
    }

    // now the user responded to a specific job, clean the 
    // timeout first, so it won't be added to queue!
    if (job.timeout) {
        clearTimeout(job.timeout)
    }

    // Now the job won't be processed somewhere else!
    // you can do whatever you want...
    // example:

    // get the action
    const action = req.query.action
    
    if(!action) {
        res.status(400).json('Bad action provided: ' + action)
        return
    }

    if(action === 'APPROVE') {
        // job approved! , add it to queue so worker node 
        // can process it..
        queue.add(job)
    }

    if(action === 'CANCEL') {
        // do something else...
    }
    /// etc..

    
    // ofc clear the job reference after you did something..
    waitingJobs[job.id] = null
    

    // since everything worked, inform user the job will now be processed!
    res.status(200).json('Job ' + job.id + 'Will now be processed')
})

于 2021-07-20T11:06:49.443 回答