0

我正在尝试使用工作线程构建一个 Node 应用程序,分为三个部分。

  • 委托任务的主线程
  • 更新共享数据的专用工作线程
  • 对共享数据运行计算的工作线程池

共享数据以几个SharedArrayBuffer对象的形式运行,就像一个伪数据库。我希望能够在不需要暂停计算的情况下更新数据,而且我可以使用稍微陈旧的数据完成一些任务。我想出的流程是:

  1. 主线程将数据传递给更新线程
  2. 更新线程创建一个全新SharedArrayBuffer的并用更新的数据填充它。
  3. 更新线程将指向新缓冲区的指针返回给主线程。
  4. 主线程将最新的指针缓存在一个变量中,覆盖其先前的值,并将其传递给每个任务的每个工作线程。
  5. 工作线程在执行操作后根本不保留这些指针。

问题是,当我运行一个经常进行更新并换出共享缓冲区的原型时,这似乎会在常驻状态堆栈中产生内存泄漏。垃圾收集似乎会通过几次删除丢弃的缓冲区,但随后它会不断攀升,直到应用程序变慢并最终挂起或崩溃。

当我完成它时,我如何保证它SharedArrayBuffer会被垃圾收集拾取,或者它甚至可能?我已经看到暗示,只要从所有线程中删除对它的所有引用,它最终就会被拾取,但不是一个明确的答案。

我正在使用threads.js库来抽象工作线程操作。这是我的原型的摘要:

应用程序.ts:

import { ModuleThread, Pool, spawn, Worker } from "threads";
import { WriterModule } from "./workers/writer-worker";
import { CalculateModule } from "./workers/calculate-worker";

class App {
    calculatePool = Pool<ModuleThread<CalculateModule>>
        (() => spawn(new Worker('./workers/calculate-worker')), { size: 6 });
    writerThread: ModuleThread<WriterModule>;

    sharedBuffer: SharedArrayBuffer;
    dataView: DataView;

    constructor() {
        this.sharedBuffer = new SharedArrayBuffer(1000000);
        this.dataView = new DataView(this.sharedBuffer);
    }

    async start(): Promise<void> {
        this.writerThread = await spawn<WriterModule>(new Worker('./workers/writer-worker'));
        await this.writerThread.init(this.sharedBuffer);

        await this.update();

        // Arbitrary delay between updates
        setInterval(() => this.update(), 5000);

        while (true) {
            // Arbitrary delay between tasks
            await new Promise<void>(resolve => setTimeout(() => resolve(), 250));

            this.calculate();
        }
    }

    async update(): Promise<void> {
        const updates: any[] = [];
        
        // generates updates

        this.sharedBuffer = await this.writerThread.update(updates);
        this.dataView = new DataView(this.sharedBuffer);
    }

    async calculate(): Promise<void> {
        const task = this.calculatePool.queue(async (calc) => calc.calculate(this.sharedBuffer));
        const sum: number = await task;
        // Use result
    }
}

const app = new App();
app.start();

writer-worker.ts:

import { expose } from "threads";

let sharedBuffer: SharedArrayBuffer;

const writerModule = {
    async init(startingBuffer: SharedArrayBuffer): Promise<void> {
        sharedBuffer = startingBuffer;
    },

    async update(data: any[]): Promise<SharedArrayBuffer> {
        // Arbitrary update time
        await new Promise<void>(resolve => setTimeout(() => resolve(), 500));

        const newSharedBuffer = new SharedArrayBuffer(1000000);

        // Copy some values from the old buffer over, perform some mutations, etc.

        sharedBuffer = newSharedBuffer;

        return sharedBuffer;
    },
}

export type WriterModule = typeof writerModule;
expose(writerModule);

计算工作者.ts

import { expose } from "threads";

const calculateModule = {
    async calculate(sharedBuffer: SharedArrayBuffer): Promise<number> {
        const view = new DataView(sharedBuffer);

        // Arbitrary calculation time
        await new Promise<void>(resolve => setTimeout(() => resolve(), 100));

        // Run arbitrary calculation
        
        return sum;
    }
}

export type CalculateModule = typeof calculateModule;

expose(calculateModule);
4

0 回答 0