2

我有一个运行Bull and express 的作业服务器。

服务器要求

接收包含对象的请求并将该对象用作本地程序的输入(别无选择),该程序需要几分钟到几个小时才能运行。作业必须按照收到的顺序一个接一个地运行(不能绕开这个)。

TL;DR 服务器:

// Set up Bull queue and process
const osmQueue = new Bull("osm", {
    redis: {
        port: "6969",
    },
});
osmQueue.process((job) => {
    extractOms(job);
});

// Process function
const extractOms = (job) => {
    // I have tried execSync also
    spawnSync("programme", [
        "this",
        "programme",
        "takes",
        "~30",
        "minutes",
        "to",
        "run",
        "and",
        "requires",
        "full",
        "system",
        "resources (cannot share system resources with another job)",
    ]);

return
};

// Express server with single route
const app = express();
app.post("/create-map-job", (req, res) => {
    console.log("fired"); // Fires on first request. Blocked on second until job done
    osmQueue.add(req.body);
    res.send("job successfully queued"); // Returns on first request. Blocked on second until job done
});

app.listen(PORT, () => {
    console.log(`Listening on ${PORT}`);
});

问题:

  1. 当我将数据发送到这条路线时,一切都按预期工作。作业开始,我从服务器收到“作业成功排队”。
  2. 当我第二次向服务器发送数据时(前一个作业仍在运行),路由中的 console.log 不会触发,并且请求会挂起,直到现有作业完成。一旦现有作业完成,请求就会得到确认,并且下一个作业会排队。

我尝试过的事情:

  1. 使用 spawn() 而不是 spawnSync()。虽然这意味着请求不再被阻止,但这意味着所有作业都同时执行。我研究了 Bull 的并发性,但是当 child_process 像 spawn() 或 exec() 一样异步时,一旦程序成功开始,作业就会被标记为完成 - 它不会等待 spawn() 完成。这意味着服务器认为该作业已完成并愉快地加载了另一个作业,并且我很快耗尽了内存并且系统崩溃了。我根本无法限制或控制内存使用量。如果我需要为每个进程提供更多内存,那么我一次只能运行 1 个

  2. 在 osmQueue.add() 之前简单调用 res.send()。这对行为没有任何改变。

  3. 在队列中使用限制器:{max, duration} 选项。如果我将限制持续时间设置为 5 小时,这会起作用,但这会大大减少我一次可以做的工作量到无法接受的低水平。

  4. 我一直在阅读并搜索了很长一段时间,但我找不到与我类似的问题。

问题:

  1. 为什么系统进程在执行 res.send() 之后会阻塞 node/express?
  2. 我应该使用另一个库吗?
  3. 非常感谢知情人士的任何见解。

让我知道是否还有其他可以添加的内容,我会尽快完成。

TL;博士要求:

作为作业的一部分按顺序执行系统进程,一个接一个地执行,而不会阻止服务器在现有作业运行时排队更多作业或响应请求。

4

1 回答 1

0

解决了; 令人惊奇的是,完整地写出一个问题可以激发大脑并让您重新审视事物。留给未来的 Google 员工。

请参阅 Bull 文档 > https://github.com/OptimalBits/bull#separate-processes

我需要调用 Bull 的独立进程。这允许我在与 node/express 进程分开的进程中运行阻塞代码,这意味着即使同步代码正在运行,未来的请求也不会被阻塞。

// osm.processor.js
module.exports = extractOms (job) {
    spawnSync("programme", [
        "this",
        "programme",
        "takes",
        "~30",
        "minutes",
        "to",
        "run",
        "and",
        "requires",
        "full",
        "system",
        "resources (cannot share system resources with another job)",
    ]);
    return;
}
// queue.js
osmQueue.process(
    "/path/to/file/above/job-server/processors/osm.processor.js"
);

这会在单独的进程中产生阻塞工作。谢谢公牛!

于 2022-01-05T15:59:31.360 回答