0

我从 Busboy 获得一个文件流,然后我将它传送到一个自定义转换流以验证和清理它。它适用于小文件,但随着它们变大,我的自定义流不会等待 busboy 流完成并被截断。

这是busboy代码:

busboy
.on("file", function(fieldname, file, filename, encoding, mimetype) {
    //Creating a mongo doc first
    Dataset.create(dataset, function (err, ds) {
        if(err) {...} 
        else {
            file.pipe(validateCSV));
        }
    });

    validateCSV
        .on("finish", function() {
            // Send to Data Import
            datasetService.import(validateCSV, dataset, function (err, result) {
                ...
            });
        });
});

还有我的转换流:

module.exports.ValidateCSV = ValidateCSV;
function ValidateCSV(options) {
    if (!(this instanceof ValidateCSV)) return new ValidateCSV(options);

    if (!options) options = {};
    options.objectMode = true;
    Transform.call(this, options);
}

util.inherits(ValidateCSV, Transform);

ValidateCSV.prototype._transform = function (chunk, encoding, done) {
    if (this._checked) {
        this.push(chunk);
    } else {
        //Do some validation
        var data = chunk.toString();
        var lines = data.match(/[^\r\n]+/g);
        var headerline = lines[0] || "";
        var header = headerline.split(",");
        ...
        this._checked = true;
        this.push(chunk);
    }
    done()
}
4

1 回答 1

0

事实证明这是一个背压问题,在转换流上看到 HighWaterMark 选项修复了它。理想情况下,它会根据上传的文件大小进行设置,但这对我来说是固定的:

function ValidateCSV(options) {
    if (!(this instanceof ValidateCSV)) return new ValidateCSV(options);

    if (!options) options = {};
    options.objectMode = true;
    options.highWaterMark = 100000;
    Transform.call(this, options);
}
于 2016-02-18T12:50:52.217 回答