1

我必须在 NodeJS 中解析一个非常大的 CSV 文件并将其保存在一个数据库(异步操作)中,一次最多允许 500 个条目。由于内存限制,我必须流式传输 CSV 文件并希望使用 PapaParse 来解析 CSV 文件(因为这在我的情况下效果最好)。

由于 PapaParse 使用回调样式方法来解析 Node.js 流,我没有看到一个容易结合 highland(用于批处理和数据转换)和 PapaParse。因此,我尝试使用 ParseThrough 流向该流写入数据并读取该流以进行批处理:

const csv = require('papaparse');
const fs = require('fs');
const highland = require('highland');
const { PassThrough } = require('stream');

const passThroughStream = new PassThrough({ objectMode: true });

csv.parse(fileStream, {
  step: function(row) {
    // Write data to stream
    passThroughStream.write(row.data[0]);
  },
  complete: function() {
    // Somehow "end" the stream
    passThroughStream.write(null);
  },
});

highland(passThroughStream)
  .map((data) => {
    // data transform
  })
  .batch(500)
  .map((data) => {
    // Save up to 500 entries in database (async call)
  });

显然,这不起作用,也没有真正做任何事情。是否有可能甚至更好的方法来解析非常大的 CSV 文件并将行保存在数据库中(最多 500 个批次)?

编辑:使用csv包(https://www.npmjs.com/package/csv)可能会像这样(相同fast-csv):

highland(fileStream.pipe(csv.parse()))
  .map((data) => {
    // data transform
  })
  .batch(500)
  .map((data) => {
    // Save up to 500 entries in database (async call)
  });

但不幸的是,这两个 NPM 包都不能在所有情况下正确解析 CSV 文件。

4

1 回答 1

4

快速浏览后,papaparse我决定在scramjet.

fileStream.pipe(new scramjet.StringStream('utf-8'))
    .csvParse(options)
    .batch(500)
    .map(items => db.insertArray('some_table', items))

我希望这对你有用。:)

于 2018-02-07T17:04:26.460 回答