0

我试图弄清楚如何创建一个流管道,它按需读取 csv 文件中的条目。为此,我想到了使用管道的以下方法(伪代码)

const stream_pipe = input_file_stream.pipe(csv_parser)
// Then getting entries through:
let entry = stream_pipe.read()

不幸的是,经过大量测试后,我发现它们在我设置管道的那一刻,它会自动消耗,直到 csv 文件结束。我试图通过.pause()在最后附加来暂停它的创建,但它似乎没有任何效果。

这是我当前的代码。我正在使用csv_parse库(较大csv包的一部分):

// Read file stream
const file_stream = fs.createReadStream("filename.csv")
const parser = csvParser({
    columns: ['a', 'b'],
    on_record: (record) => {
        // A simple filter as I am interested only in numeric entries
        let a = parseInt(record.a)
        let b = parseInt(record.b)
        return (isNaN(a) || isNaN(b)) ? undefined : record
    }
})
const reader = stream.pipe(parser) // Adding .pause() seems to have no effect
console.log(reader.read()) // Prints `null`

// I found out I can use this strategy to read a few entries immediately, but I cannot break out of it and then resume as the stream will automatically be consumed 
//for await (const record of reader) {
//    console.log(record)
//} 

我一直在努力解决这个问题,但在 csv 包和节点官方文档中都找不到简单的解决方案。

提前感谢任何能让我走上正轨的人:)

4

1 回答 1

0

您可以在读取流时做一件事,您可以创建一个 readLineInterface 并传递输入流和正常输出流,如下所示:

const inputStream = "reading the csv file",
      outputStream = new stream();

// now create a readLineInterface which will read 
// line by line you should use async/await 

const res = await processRecord(readline.createInterface(inputStream, outputStream));

async function processRecord(line) {
   return new Promise((res, rej) => {
       if (line) {
        // do the processing 
        res(line);
       }
       
       rej('Unable to process record');
   })
}

现在 createprocessRecord函数应该逐行获取内容,并且您可以承诺使其顺序化。

注意:上面的代码是一个伪代码,只是为了让您了解事情是否有效,因为我在我的项目中一直在做同样的事情来读取csv文件行和行并且它工作正常。

于 2021-12-10T15:27:48.997 回答