0

我创建了一个 cron 作业,它以 JSON 格式从 API 下载大量记录(多达 110+ 百万个对象),将它们写入 CSV、PUT 和 COPY INTO 雪花数据库以执行插入。

我的工作可以超快地处理少于 100k 条记录的下载(最多运行 15 分钟),但是当它执行数百万次下载时速度太慢了,我开发了一个循环来处理每个限制在多个 CSV 文件中对它们进行批处理(每个文件拆分为 100k 行)。

循环看起来像这样:

                    const results_download = [];
                    let top = 100000,
                        skip = 0,
                        loop = true,
                        counter = 0,
                        url = '';

                    const max_tries = 3;
                    try {
                        while (loop) {
                            let try_count = 1;

                            if (threshold_size > 10000000) {
                                // API Threshold size reached, enable batching of the downloads
                                url += '?$top=' + top + '&$skip=' + skip;
                            } else {
                                loop = false;
                            }

                            let data = null;
                            while (try_count <= max_tries) {
                                data = await getData(url);

                                if (downloads.message) {
                                    try_count++;

                                    if (try_count > max_tries) {
                                        results_download.push(data);
                                    }
                                } else {
                                    try_count = max_tries + 1;
                                }
                                // more code
                                if (loop && (skip + top) > count_api) {
                                    loop = false;
                                } else {
                                    skip = skip + top;
                                }
                            }

我认为这需要这么长时间的主要原因是因为我将每次检索的下载限制为 10 万个条目,而且我知道启动不同的 HTTP 连接比从一个获取更多数据更消耗,但我必须这样做,因为任何事情高于该限制导致 axios(我正在使用 GET 数据)转储以下错误:

Error: Cannot create a string longer than 0x3fffffe7 characters
    at Buffer.utf8Slice (<anonymous>)
    at Buffer.toString (buffer.js:797:17)
    at Request.<anonymous> (/home/proj/node_modules/request/request.js:1128:39)
    at Request.emit (events.js:315:20)
    at IncomingMessage.<anonymous> (/home/proj/node_modules/request/request.js:1076:12)
    at Object.onceWrapper (events.js:421:28)
    at IncomingMessage.emit (events.js:327:22)
    at endReadableNT (_stream_readable.js:1220:12)
    at processTicksAndRejections (internal/process/task_queues.js:84:21) {
  code: 'ERR_STRING_TOO_LONG'
}

当我尝试从调用本身检索 300k 记录时不会发生此错误,但是当我尝试在作业运行时获取那么多记录时会发生此错误,它通过对许多记录的承诺处理 3 个并行下载。所以理论上我最多可以有3个“线程”一次下载4+百万条记录,每个线程将执行该过程13次。

我阅读了一些可能的解决方案,包括使用 Streams 从响应中读取数据并将它们处理成 CSV,但我对数据工程真的很陌生,我想知道使用 Node.js 来实现这样一个真正好的解决方案具体问题。我真的很想得到建议,而不是完整的解决方案。

非常感谢,如果需要更多上下文,我可以编辑问题并提供对问题的更多见解。

额外信息

  • 作业在部署为应用程序的 EKS 环境中运行。
4

0 回答 0