6

假设您有一个巨大的 (> 1GB) CSV 记录 ID:

655453
4930285
493029
4930301
493031
...

对于每个id您想要调用 REST API 来获取记录数据、在本地对其进行转换并将其插入到本地数据库中的操作。

你如何使用 Node.js 的ReadableStream做到这一点?

我的问题基本上是这样的:你如何逐行读取一个非常大的文件,为每一行运行一个异步函数,并且[可选地]能够从特定行开始读取文件?

从以下 Quora 问题我开始学习使用fs.createReadStream

http://www.quora.com/What-is-the-best-way-to-read-a-file-line-by-line-in-node-js

var fs = require('fs');
var lazy = require('lazy');

var stream = fs.createReadStream(path, {
  flags: 'r',
  encoding: 'utf-8'
});

new lazy(stream).lines.forEach(function(line) {
  var id = line.toString();
  // pause stream
  stream.pause();
  // make async API call...
  makeAPICall(id, function() {
    // then resume to process next id
    stream.resume();
  });
});

但是,该伪代码不起作用,因为该lazy模块会强制您读取整个文件(作为流,但没有暂停)。所以这种方法似乎行不通。

另一件事是,我希望能够从特定行开始处理此文件。这样做的原因是,处理每个id记录(进行 api 调用、清理数据等)每条记录最多可能需要半秒,所以我不想每次都从文件的开头开始。我正在考虑使用的天真的方法是捕获最后处理的 id 的行号,然后保存它。然后,当您再次解析文件时,您逐行遍历所有 id,直到找到您离开的行号,然后您就可以执行此操作makeAPICall。另一种天真的方法是编写小文件(比如 100 个 id)并一次处理每个文件(足够小的数据集,可以在没有 IO 流的情况下在内存中执行所有操作)。有一个更好的方法吗?

我可以看到这变得多么棘手(以及node-lazy出现的地方),因为chunkinstream.on('data', function(chunk) {});可能只包含一行的一部分(如果 bufferSize 很小,每个块可能是 10 行但因为它id是可变长度的,它可能只是9.5 行或其他)。这就是为什么我想知道解决上述问题的最佳方法是什么。

4

2 回答 2

2

与 Andrew Андрей Листочкин 的回答相关:

您可以使用像bylinedata这样的模块来为每一行获取单独的事件。它是围绕原始文件流的转换流,它data为每个块生成一个事件。这使您可以在每行之后暂停。

byline不会像lazy显然那样将整个文件读入内存。

var fs = require('fs');
var byline = require('byline');

var stream = fs.createReadStream('bigFile.txt');
stream.setEncoding('utf8');

// Comment out this line to see what the transform stream changes.
stream = byline.createStream(stream); 

// Write each line to the console with a delay.
stream.on('data', function(line) {
  // Pause until we're done processing this line.
  stream.pause();

  setTimeout(() => {
      console.log(line);

      // Resume processing.
      stream.resume();
  }, 200);
});
于 2017-11-06T04:44:48.117 回答
1

我想你不需要使用node-lazy. 这是我在Node 文档中找到的内容:

事件data

function (data) { }

data事件发出一个Buffer(默认情况下)或一个stringif setEncoding()被使用。

所以这意味着你调用setEncoding()你的流然后你的data事件回调将接受一个字符串参数。然后在这个回调中你可以调用使用.pause().resume()方法。

伪代码应如下所示:

stream.setEncoding('utf8');
stream.addListener('data', function (line) {
    // pause stream
    stream.pause();
    // make async API call...
    makeAPICall(line, function() {
        // then resume to process next line
        stream.resume();
    });
})

尽管文档没有明确指定流是逐行读取的,但我认为文件流就是这种情况。至少在其他语言和平台中,文本流以这种方式工作,我认为节点流没有任何不同的理由。

于 2012-06-18T08:01:43.773 回答