我想你应该稍微改变一下你的架构。WorkerNode
完全按照它的意图去做,它接受jobs
并运行它们。queue
因此,您不应以某种方式阻止 ,而应仅在用户批准/取消/失败(或 120 秒后未发送响应)时job
添加。queue
如果我理解正确,这应该让您了解如何控制不同请求之间的作业:
// this is YOUR queueu object. I don't now implentation but think
// of it like this..
const queue = new Queue()
// a variable holding the pending jobs which are not timeouted
// or explicitly approved/canceled/failed by user
const waitingJobs = {
}
// This could be your location where the user calls the api for creating a job.
app.post('/job', (req, res) => {
// create the job as the user requested it
const job = createJob(req)
// Add a timeout for 120 seconds into your waitingJobs array.
// So if the user does not respond after that time, the job will
// be added to queue! .
const timeout = setTimeout(() => {
queue.add(job)
// remove the reference after adding, garbage collection..
waitingJobs[job.id] = null
// job is added to queue automatically after 120 seconds
}, 120 * 1000)
// store the timeout in the job object!
job.timeout = timeout
// store the waiting job!
waitingJobs[job.id] = job
// respond to user, send back id so client can do another
// request if wanted.
req.status(200).json({ message: 'Job created!', id: job.id })
})
app.post('/job/:id', (req, res) => {
const id = req.params.id
if (!id) {
req.status(400).json('bad job id provided')
return
}
// get the queued job:
const job = waitingJobs[id]
if (!job) {
req.status(400).json('Job nod found OR job already processed. Job id: ' + id)
return
}
// now the user responded to a specific job, clean the
// timeout first, so it won't be added to queue!
if (job.timeout) {
clearTimeout(job.timeout)
}
// Now the job won't be processed somewhere else!
// you can do whatever you want...
// example:
// get the action
const action = req.query.action
if(!action) {
res.status(400).json('Bad action provided: ' + action)
return
}
if(action === 'APPROVE') {
// job approved! , add it to queue so worker node
// can process it..
queue.add(job)
}
if(action === 'CANCEL') {
// do something else...
}
/// etc..
// ofc clear the job reference after you did something..
waitingJobs[job.id] = null
// since everything worked, inform user the job will now be processed!
res.status(200).json('Job ' + job.id + 'Will now be processed')
})