2

我正在使用带有快速服务器的 BullMQ 来异步处理作业,但对如何从已完成的作业中检索结果感到困惑。

我目前正在做的是监听作业完成状态事件并将这些结果存储在一个以作业 ID 为键的对象中,并在我需要时从该对象中检索结果。有推荐的方法吗?

我查看了BullMQ 文档,但找不到任何有关如何检索结果的信息。

这是示例代码:

服务器.js

// Kick off a new job by adding it to the work queue
app.post("/api/submitjob", async (req, res) => {
  let job = await workQueue.add();
  res.json({ id: job.id });
});

app.get("/api/jobstatus/:id", async (req, res) => {
  let id = req.params.id;
  let job = await workQueue.getJob(id);

  if (job === null) {
    res.status(404).end();
  } else {
    let state = await job.getState();
    let reason = job.failedReason;
    res.json({ id, state, progress, reason, result: jobIdResultMap[id] });
  }
});

// You can listen to global events to get notified when jobs are processed
workQueue.on('global:completed', (jobId, result) => {
  logger.log('info', `${jobId} succesfully completed`);
  jobIdResultMap[jobId] = JSON.parse(result);
});

app.listen(PORT, () => console.log(`✅  API Server started: http://${HOST}:${PORT}/api/v1/endpoint`));

工人.js:

let throng = require("throng");
let Queue = require("bull");

// Connect to a local redis instance locally, and the Heroku-provided URL in production
let REDIS_URL = process.env.REDIS_URL || "redis://127.0.0.1:6379";

// Spin up multiple processes to handle jobs to take advantage of more CPU cores
// See: https://devcenter.heroku.com/articles/node-concurrency for more info
let workers = process.env.WEB_CONCURRENCY || 2;

// The maximum number of jobs each worker should process at once. This will need
// to be tuned for your application. If each job is mostly waiting on network
// responses it can be much higher. If each job is CPU-intensive, it might need
// to be much lower.
let maxJobsPerWorker = 50;

function sleep(ms) {
  return new Promise((resolve) => setTimeout(resolve, ms));
}

function start() {
  // Connect to the named work queue
  let workQueue = new Queue("work", REDIS_URL);

  workQueue.process(maxJobsPerWorker, async (job) => {
    // This is an example job that just slowly reports on progress
    // while doing no work. Replace this with your own job logic.
    let progress = 0;

    await sleep(50);

    // A job can return values that will be stored in Redis as JSON
    // This return value is unused in this demo application.
    return { value: "This will be stored" };
  });
}

// Initialize the clustered worker process
// See: https://devcenter.heroku.com/articles/node-concurrency for more info
throng({ workers, start });
4

1 回答 1

2

The recommended way of doing this is to use a job queue and a message queue.

At the time of writing BullMQ's documentation is incomplete, therefore you should look at the docs from Bull.

From the documentation -

Returning Job Completions

A common pattern is where you have a cluster of queue processors that just process jobs as fast as they can, and some other services that need to take the result of this processors and do something with it, maybe storing results in a database.

The most robust and scalable way to accomplish this is by combining the standard job queue with the message queue pattern: a service sends jobs to the cluster just by opening a job queue and adding jobs to it, and the cluster will start processing as fast as it can. Every time a job gets completed in the cluster a message is sent to a results message queue with the result data, and this queue is listened by some other service that stores the results in a database.

于 2021-01-14T12:50:06.973 回答