我有大量的工作要运行。如果单线程运行,执行速率将受 CPU 限制(序列化/反序列化等)。我想通过在一组节点工作线程中划分作业来提高吞吐量。问题在于,创造就业机会的速度很容易超过工人消耗的速度。作业集很大,无法放入内存。因此,如果生产者领先于消费者,那么进程会因为内存不足而失败。
这是一个工作示例。当节点配置为最小堆时,它在我的计算机上生成大约 3000 个作业后崩溃--max-old-space-size=4
。
// node 16, 8 cpus
// node --max-old-space-size=4 oom.js
// crashes after producing 3000 jobs
const os = require("os");
const { Worker, parentPort, isMainThread, threadId } = require("worker_threads");
// Produce jobs quickly
async function main() {
const workers = os.cpus().map(() => new Worker(__filename));
let id = 0;
async function* generateJobs() {
while (id < 100_000) {
const message = {
id,
data: "0".repeat(100_000)
};
yield message;
if (id % 1000 === 0) console.log("produced", message.id);
id++;
await new Promise(r => setTimeout(r, 1));
}
}
for await (const history of generateJobs()) {
workers[0].postMessage(history);
// Rotate workers
workers.push(workers.shift());
}
workers.forEach(w => w.postMessage("exit"));
const workerExits = workers.map(w => new Promise(resolve => w.on("exit", resolve)));
await Promise.all(workerExits);
}
// Consume jobs slowly
function worker() {
parentPort.on("message", message => {
if (message === "exit") {
console.log("finish", threadId);
parentPort.close();
return;
}
if (message.id % 1000 === 0) console.log("worker", threadId, "started", message.id);
for (let i = 0; i < 1e8; i++) {} // Simulate a executing a CPU-bound job.
if (message.id % 1000 === 0) console.log("worker", threadId, "finished", message.id);
});
}
if (isMainThread) {
main();
} else {
worker();
}
是否有与节点工作线程一起使用的进程内队列?关键是为生产者提供背压,将排队的作业数量限制在合理的数量。