我正在使用带有快速服务器的 BullMQ 来异步处理作业,但对如何从已完成的作业中检索结果感到困惑。
我目前正在做的是监听作业完成状态事件并将这些结果存储在一个以作业 ID 为键的对象中,并在我需要时从该对象中检索结果。有推荐的方法吗?
我查看了BullMQ 文档,但找不到任何有关如何检索结果的信息。
这是示例代码:
服务器.js
// Kick off a new job by adding it to the work queue
app.post("/api/submitjob", async (req, res) => {
let job = await workQueue.add();
res.json({ id: job.id });
});
app.get("/api/jobstatus/:id", async (req, res) => {
let id = req.params.id;
let job = await workQueue.getJob(id);
if (job === null) {
res.status(404).end();
} else {
let state = await job.getState();
let reason = job.failedReason;
res.json({ id, state, progress, reason, result: jobIdResultMap[id] });
}
});
// You can listen to global events to get notified when jobs are processed
workQueue.on('global:completed', (jobId, result) => {
logger.log('info', `${jobId} succesfully completed`);
jobIdResultMap[jobId] = JSON.parse(result);
});
app.listen(PORT, () => console.log(`✅ API Server started: http://${HOST}:${PORT}/api/v1/endpoint`));
工人.js:
let throng = require("throng");
let Queue = require("bull");
// Connect to a local redis instance locally, and the Heroku-provided URL in production
let REDIS_URL = process.env.REDIS_URL || "redis://127.0.0.1:6379";
// Spin up multiple processes to handle jobs to take advantage of more CPU cores
// See: https://devcenter.heroku.com/articles/node-concurrency for more info
let workers = process.env.WEB_CONCURRENCY || 2;
// The maximum number of jobs each worker should process at once. This will need
// to be tuned for your application. If each job is mostly waiting on network
// responses it can be much higher. If each job is CPU-intensive, it might need
// to be much lower.
let maxJobsPerWorker = 50;
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function start() {
// Connect to the named work queue
let workQueue = new Queue("work", REDIS_URL);
workQueue.process(maxJobsPerWorker, async (job) => {
// This is an example job that just slowly reports on progress
// while doing no work. Replace this with your own job logic.
let progress = 0;
await sleep(50);
// A job can return values that will be stored in Redis as JSON
// This return value is unused in this demo application.
return { value: "This will be stored" };
});
}
// Initialize the clustered worker process
// See: https://devcenter.heroku.com/articles/node-concurrency for more info
throng({ workers, start });