0

我有一个脚本,可以从服务器下载数千个文件,对这些文件执行一些 CPU 密集型计算,然后将结果上传到某个地方。作为一个附加级别的复杂性,我想限制与我正在下载文件的服务器的并发连接数。

为了让 CPU 密集型计算脱离事件线程,我利用了 josdejong 的 workerpool。我还想我可以利用这样一个事实,即在任何给定时间只会启动有限数量的线程来限制与我的服务器的并发连接数,所以我尝试将网络 I/O 放在工作进程中,例如所以(打字稿):

import Axios from "axios";
import workerpool from "workerpool";

const pool = workerpool.pool({
    minWorkers: "max",
});

async function processData(file: string) {
    console.log("Downloading " + file);
    const csv = await Axios.request<IncomingMessage>({
        method: "GET",
        url: file,
        responseType: "stream"
    });
    console.log(csv);
    // TODO: Will process the file here
}

export default async function (files: string[]) {
    const promiseArray: workerpool.Promise<Promise<void>>[] = [];
    // Only processing the first file for now during testing
    files.slice(0, 1).forEach((file) => {
        promiseArray.push(pool.exec(processData, [file]));
    });
    await Promise.allSettled(promiseArray);
    await pool.terminate();
}

当我编译并运行此代码时,我看到消息“正在下载 test.txt”,但之后我看不到以下日志语句 ( console.log(csv))

我已经尝试对此代码进行各种修改,包括删除responseType、删除await和仅检查PromiseAxios 返回的、使函数非异步等。不管它似乎总是Axios.request在线崩溃

工作线程是否无法打开 HTTP 连接或其他什么?还是我只是犯了一个愚蠢的错误?

4

2 回答 2

1

如果它没有到达这行代码:

console.log(csv);

然后,要么Axios.request()永远不会履行其承诺,要么该承诺被拒绝。您在任何这些函数中都没有错误处理,因此如果它被拒绝,您将不知道也不会记录问题。作为初学者,我建议您检测您的代码,以便您可以记录任何拒绝:

async function processData(file: string) {
    try {
        console.log("Downloading " + file);
        const csv = await Axios.request<IncomingMessage>({
            method: "GET",
            url: file,
            responseType: "stream"
        });
        console.log(csv);
    } catch(e) {
        console.log(e);          // log an error
        throw e;                 // propagate rejection/error
    }

}

作为代码设计的一般要点,您应该在某个级别捕获并记录任何可能的 Promise 拒绝。您不必在最低调用级别捕获它们,因为它们会通过返回的 Promise 传播,但您确实需要在某个地方捕获任何可能的拒绝,并且为了您自己的开发理智,您需要记录它以便您可以看看它何时发生以及错误是什么。

于 2020-10-22T04:30:14.087 回答
-2

您不能在工作线程中执行 TypeScript。该pool.exec方法接受静态 JavaScript 函数或具有相同函数的 JavaScript 文件的路径。

这是workerpool 自述文件的引述:

请注意,函数和参数都必须是静态的和可字符串化的,因为它们需要以序列化的形式发送给工作人员。在大型函数或函数参数的情况下,将数据发送给工作人员的开销可能很大。

我正在尝试使用 TypeScript 来完成这项工作。解决此问题的可能方法是:

  • 在 TypeScript 中编写一个工作函数,使用任何捆绑器将其编译为单独的包,然后将编译文件的路径传递给pool.exec. 我设法完成了这项工作,但我唯一不满意的是,使用这个解决方案你不能使用 nodemon(如果你使用它)
  • 使用编译 TS 源代码并使用ts-node执行它的 JS 包装器。然后将该包装器的路径传递给pool.exec函数。此解决方案不适用于捆绑器
于 2020-11-19T20:37:20.813 回答