0

我有大量的工作要运行。如果单线程运行,执行速率将受 CPU 限制(序列化/反序列化等)。我想通过在一组节点工作线程中划分作业来提高吞吐量。问题在于,创造就业机会的速度很容易超过工人消耗的速度。作业集很大,无法放入内存。因此,如果生产者领先于消费者,那么进程会因为内存不足而失败。

这是一个工作示例。当节点配置为最小堆时,它在我的计算机上生成大约 3000 个作业后崩溃--max-old-space-size=4

// node 16, 8 cpus
// node --max-old-space-size=4 oom.js
// crashes after producing 3000 jobs

const os = require("os");
const { Worker, parentPort, isMainThread, threadId } = require("worker_threads");

// Produce jobs quickly
async function main() {
  const workers = os.cpus().map(() => new Worker(__filename));

  let id = 0;
  async function* generateJobs() {
    while (id < 100_000) {
      const message = {
        id,
        data: "0".repeat(100_000)
      };
      yield message;
      if (id % 1000 === 0) console.log("produced", message.id);

      id++;
      await new Promise(r => setTimeout(r, 1));
    }
  }

  for await (const history of generateJobs()) {
    workers[0].postMessage(history);

    // Rotate workers
    workers.push(workers.shift());
  }

  workers.forEach(w => w.postMessage("exit"));
  const workerExits = workers.map(w => new Promise(resolve => w.on("exit", resolve)));
  await Promise.all(workerExits);
}

// Consume jobs slowly
function worker() {
  parentPort.on("message", message => {
    if (message === "exit") {
      console.log("finish", threadId);
      parentPort.close();
      return;
    }
    if (message.id % 1000 === 0) console.log("worker", threadId, "started", message.id);
    for (let i = 0; i < 1e8; i++) {} // Simulate a executing a CPU-bound job.
    if (message.id % 1000 === 0) console.log("worker", threadId, "finished", message.id);
  });
}

if (isMainThread) {
  main();
} else {
  worker();
}

是否有与节点工作线程一起使用的进程内队列?关键是为生产者提供背压,将排队的作业数量限制在合理的数量。

4

1 回答 1

0

这是解决问题的一种方法。当工作人员请求工作时,生产者生成它并将其发送给请求工作人员。每个工作人员在启动时会缓冲固定数量的作业。完成每个工作后,工人请求下一个工作。系统具有最大数量的进行中作业。所以所需的内存也是有界的。

// node 16, 8 cpus
// node --max-old-space-size=4 rr
// finishes: 5444.60s user 13.14s system 735% cpu 12:21.98 total

const os = require("os");
const { Worker, parentPort, isMainThread, threadId } = require("worker_threads");

// Produce jobs quickly
async function main() {
  const workers = os.cpus().map(() => new Worker(__filename));

  let id = 0;
  async function* generateJobs() {
    while (id < 100_000) {
      const message = {
        id,
        data: "0".repeat(100_000)
      };
      yield message;
      if (id % 1000 === 0) console.log("produced", message.id);

      id++;
      await new Promise(r => setTimeout(r, 1));
    }
  }

  const generator = generateJobs();
  workers.forEach(w =>
    w.on("message", async () => {
      const { value, done } = await generator.next();
      if (done) {
        w.postMessage("exit");
      } else {
        w.postMessage(value);
      }
    })
  );

  const workerExits = workers.map(w => new Promise(resolve => w.on("exit", resolve)));
  await Promise.all(workerExits);
}

// Consume jobs slowly
function worker() {
  parentPort.on("message", message => {
    if (message === "exit") {
      console.log("exit", threadId);
      parentPort.close();
      return;
    }
    if (message.id % 1000 === 0) console.log("worker", threadId, "started", message.id);
    for (let i = 0; i < 1e8; i++) {} // Simulate a executing a CPU-bound job.
    if (message.id % 1000 === 0) console.log("worker", threadId, "finished", message.id);
    parentPort.postMessage("ready"); // after one job finishes, ask for the next job
  });

  // Buffer 10 unstarted jobs, to make sure the worker stays busy.
  for (let i = 0; i < 10; i++) {
    parentPort.postMessage("ready");
  }
}

if (isMainThread) {
  main();
} else {
  worker();
}

解决问题的另一种方法是使用 SharedArrayBuffer 实现队列。在这种情况下,Atomics 等待和通知将代替 MessagePort postMessage。SharedArrayBuffer 队列的开销可能会更少。另一方面,实现起来会更复杂,并且可能对消息对象有更严格的限制。

于 2021-12-22T00:48:39.793 回答