我需要构建一个函数来处理用于 bluebird.map() 调用的大型 CSV 文件。鉴于文件的潜在大小,我想使用流式传输。
这个函数应该接受一个流(一个 CSV 文件)和一个函数(处理来自流的块),并在文件被读取结束(已解决)或错误(被拒绝)时返回一个承诺。
所以,我开始:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
// use readable or data event?
parser.on('readable', function() {
// call processor, which may be async
// how do I throttle the amount of promises generated
});
var db = pgp(api.config.mailroom.fileMakerDbConfig);
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
现在,我有两个相互关联的问题:
- 我需要限制正在处理的实际数据量,以免造成内存压力。
- 作为
processor
参数传递的函数通常是异步的,例如通过基于 promise 的库(现在:)将文件的内容保存到 dbpg-promise
。因此,它将在记忆中创造一个承诺,并不断地继续前进。
该pg-promise
库具有管理此功能的功能,例如page(),但我无法提前说明如何将流事件处理程序与这些 Promise 方法混合使用。现在,我readable
在每个之后的部分的处理程序中返回了一个承诺read()
,这意味着我创建了大量承诺的数据库操作,并最终因为我达到了进程内存限制而出错。
有没有人有一个我可以用作跳跃点的工作示例?
更新:可能不止一种给猫剥皮的方法,但这有效:
'use strict';
var _ = require('lodash');
var promise = require('bluebird');
var csv = require('csv');
var stream = require('stream');
var pgp = require('pg-promise')({promiseLib: promise});
api.parsers.processCsvStream = function(passedStream, processor) {
// some checks trimmed out for example
var db = pgp(api.config.mailroom.fileMakerDbConfig);
var parser = csv.parse(passedStream, {trim: true});
passedStream.pipe(parser);
var readDataFromStream = function(index, data, delay) {
var records = [];
var record;
do {
record = parser.read();
if(record != null)
records.push(record);
} while(record != null && (records.length < api.config.mailroom.fileParserConcurrency))
parser.pause();
if(records.length)
return records;
};
var processData = function(index, data, delay) {
console.log('processData(' + index + ') > data: ', data);
parser.resume();
};
parser.on('readable', function() {
db.task(function(tsk) {
this.page(readDataFromStream, processData);
});
});
return new Promise(function(resolve, reject) {
parser.on('end', resolve);
parser.on('error', reject);
});
}
有人认为这种方法存在潜在问题吗?