我正在尝试使用工作线程构建一个 Node 应用程序,分为三个部分。
- 委托任务的主线程
- 更新共享数据的专用工作线程
- 对共享数据运行计算的工作线程池
共享数据以几个SharedArrayBuffer
对象的形式运行,就像一个伪数据库。我希望能够在不需要暂停计算的情况下更新数据,而且我可以使用稍微陈旧的数据完成一些任务。我想出的流程是:
- 主线程将数据传递给更新线程
- 更新线程创建一个全新
SharedArrayBuffer
的并用更新的数据填充它。 - 更新线程将指向新缓冲区的指针返回给主线程。
- 主线程将最新的指针缓存在一个变量中,覆盖其先前的值,并将其传递给每个任务的每个工作线程。
- 工作线程在执行操作后根本不保留这些指针。
问题是,当我运行一个经常进行更新并换出共享缓冲区的原型时,这似乎会在常驻状态堆栈中产生内存泄漏。垃圾收集似乎会通过几次删除丢弃的缓冲区,但随后它会不断攀升,直到应用程序变慢并最终挂起或崩溃。
当我完成它时,我如何保证它SharedArrayBuffer
会被垃圾收集拾取,或者它甚至可能?我已经看到暗示,只要从所有线程中删除对它的所有引用,它最终就会被拾取,但不是一个明确的答案。
我正在使用threads.js库来抽象工作线程操作。这是我的原型的摘要:
应用程序.ts:
import { ModuleThread, Pool, spawn, Worker } from "threads";
import { WriterModule } from "./workers/writer-worker";
import { CalculateModule } from "./workers/calculate-worker";
class App {
calculatePool = Pool<ModuleThread<CalculateModule>>
(() => spawn(new Worker('./workers/calculate-worker')), { size: 6 });
writerThread: ModuleThread<WriterModule>;
sharedBuffer: SharedArrayBuffer;
dataView: DataView;
constructor() {
this.sharedBuffer = new SharedArrayBuffer(1000000);
this.dataView = new DataView(this.sharedBuffer);
}
async start(): Promise<void> {
this.writerThread = await spawn<WriterModule>(new Worker('./workers/writer-worker'));
await this.writerThread.init(this.sharedBuffer);
await this.update();
// Arbitrary delay between updates
setInterval(() => this.update(), 5000);
while (true) {
// Arbitrary delay between tasks
await new Promise<void>(resolve => setTimeout(() => resolve(), 250));
this.calculate();
}
}
async update(): Promise<void> {
const updates: any[] = [];
// generates updates
this.sharedBuffer = await this.writerThread.update(updates);
this.dataView = new DataView(this.sharedBuffer);
}
async calculate(): Promise<void> {
const task = this.calculatePool.queue(async (calc) => calc.calculate(this.sharedBuffer));
const sum: number = await task;
// Use result
}
}
const app = new App();
app.start();
writer-worker.ts:
import { expose } from "threads";
let sharedBuffer: SharedArrayBuffer;
const writerModule = {
async init(startingBuffer: SharedArrayBuffer): Promise<void> {
sharedBuffer = startingBuffer;
},
async update(data: any[]): Promise<SharedArrayBuffer> {
// Arbitrary update time
await new Promise<void>(resolve => setTimeout(() => resolve(), 500));
const newSharedBuffer = new SharedArrayBuffer(1000000);
// Copy some values from the old buffer over, perform some mutations, etc.
sharedBuffer = newSharedBuffer;
return sharedBuffer;
},
}
export type WriterModule = typeof writerModule;
expose(writerModule);
计算工作者.ts
import { expose } from "threads";
const calculateModule = {
async calculate(sharedBuffer: SharedArrayBuffer): Promise<number> {
const view = new DataView(sharedBuffer);
// Arbitrary calculation time
await new Promise<void>(resolve => setTimeout(() => resolve(), 100));
// Run arbitrary calculation
return sum;
}
}
export type CalculateModule = typeof calculateModule;
expose(calculateModule);