12

我需要构建一个函数来处理用于 bluebird.map() 调用的大型 CSV 文件。鉴于文件的潜在大小,我想使用流式传输。

这个函数应该接受一个流(一个 CSV 文件)和一个函数(处理来自流的块),并在文件被读取结束(已解决)或错误(被拒绝)时返回一个承诺。

所以,我开始:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  // use readable or data event?
  parser.on('readable', function() {
    // call processor, which may be async
    // how do I throttle the amount of promises generated
  });

  var db = pgp(api.config.mailroom.fileMakerDbConfig);

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });

}

现在,我有两个相互关联的问题:

  1. 我需要限制正在处理的实际数据量,以免造成内存压力。
  2. 作为processor参数传递的函数通常是异步的,例如通过基于 promise 的库(现在:)将文件的内容保存到 db pg-promise。因此,它将在记忆中创造一个承诺,并不断地继续前进。

pg-promise库具有管理此功能的功能,例如page(),但我无法提前说明如何将流事件处理程序与这些 Promise 方法混合使用。现在,我readable在每个之后的部分的处理程序中返回了一个承诺read(),这意味着我创建了大量承诺的数据库操作,并最终因为我达到了进程内存限制而出错。

有没有人有一个我可以用作跳跃点的工作示例?

更新:可能不止一种给猫剥皮的方法,但这有效:

'use strict';

var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');

var pgp = require('pg-promise')({promiseLib: promise});

api.parsers.processCsvStream = function(passedStream, processor) {

  // some checks trimmed out for example

  var db = pgp(api.config.mailroom.fileMakerDbConfig);
  var parser = csv.parse(passedStream, {trim: true});
  passedStream.pipe(parser);

  var readDataFromStream = function(index, data, delay) {
    var records = [];
    var record;
    do {
      record = parser.read();
      if(record != null)
        records.push(record);
    } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
    parser.pause();

    if(records.length)
      return records;
  };

  var processData = function(index, data, delay) {
    console.log('processData(' + index + ') > data: ', data);
    parser.resume();
  };

  parser.on('readable', function() {
    db.task(function(tsk) {
      this.page(readDataFromStream, processData);
    });
  });

  return new Promise(function(resolve, reject) {
    parser.on('end', resolve);
    parser.on('error', reject);
  });
}

有人认为这种方法存在潜在问题吗?

4

4 回答 4

8

你可能想看看promise-streams

var ps = require('promise-streams');
passedStream
  .pipe(csv.parse({trim: true}))
  .pipe(ps.map({concurrent: 4}, row => processRowDataWhichMightBeAsyncAndReturnPromise(row)))
  .wait().then(_ => {
    console.log("All done!");
  });

适用于背压和一切。

于 2015-10-27T04:39:25.090 回答
5

在下面找到一个完整的应用程序,它可以正确执行您想要的相同类型的任务:它将文件作为流读取,将其解析为 CSV 并将每一行插入数据库。

const fs = require('fs');
const promise = require('bluebird');
const csv = require('csv-parse');
const pgp = require('pg-promise')({promiseLib: promise});

const cn = "postgres://postgres:password@localhost:5432/test_db";
const rs = fs.createReadStream('primes.csv');

const db = pgp(cn);

function receiver(_, data) {
    function source(index) {
        if (index < data.length) {
            // here we insert just the first column value that contains a prime number;
            return this.none('insert into primes values($1)', data[index][0]);
        }
    }

    return this.sequence(source);
}

db.task(t => {
    return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver);
})
    .then(data => {
        console.log('DATA:', data);
    }
    .catch(error => {
        console.log('ERROR:', error);
    });

请注意,我唯一改变的是:使用 librarycsv-parse而不是csv, 作为更好的选择。

添加了对spex库中的方法stream.read的使用,该方法正确地提供了一个可读流以与 Promise 一起使用。

于 2015-10-15T02:47:03.560 回答
2

我找到了一种更好的方法来做同样的事情;有更多的控制权。这是一个具有精确并行控制的最小骨架。以并行值为一个,所有记录都按顺序处理,而无需将整个文件放在内存中,我们可以增加并行值以加快处理速度。

      const csv = require('csv');
      const csvParser = require('csv-parser')
      const fs = require('fs');

      const readStream = fs.createReadStream('IN');
      const writeStream = fs.createWriteStream('OUT');

      const transform = csv.transform({ parallel: 1 }, (record, done) => {
                                           asyncTask(...) // return Promise
                                           .then(result => {
                                             // ... do something when success
                                             return done(null, record);
                                           }, (err) => {
                                             // ... do something when error
                                             return done(null, record);
                                           })
                                       }
                                     );

      readStream
      .pipe(csvParser())
      .pipe(transform)
      .pipe(csv.stringify())
      .pipe(writeStream);

这允许为每条记录执行异步任务。

要返回一个 Promise,我们可以返回一个空的 Promise,并在流完成时完成它。

    .on('end',function() {
      //do something wiht csvData
      console.log(csvData);
    });
于 2020-02-19T10:14:47.193 回答
1

所以说您不想要流式传输而是某种数据块?;-)

你知道https://github.com/substack/stream-handbook吗?

我认为不改变架构的最简单方法是某种承诺池。例如https://github.com/timdp/es6-promise-pool

于 2015-10-14T17:51:13.820 回答