我有一个正在检查 CSV 的流。它工作正常,除非发出错误,即使在我发回响应后它也会挂起。
export function ValidateCSV(options) {
let opt = options;
if (!(this instanceof ValidateCSV)) return new ValidateCSV(opt);
if (!opt) opt = {};
opt.objectMode = true;
opt.highWaterMark = 1000000;
Transform.call(this, opt);
}
util.inherits(ValidateCSV, Transform);
ValidateCSV.prototype.destroy = function () {
this.readable = false;
this.writable = false;
this.emit('end');
};
ValidateCSV.prototype._transform = function (chunk, encoding, done) {
// Do some stuff to the chunk
// Emit error
if (required.length > 0) {
this.emit('error', `The following columns are required: ${required.join(', ')}`);
}
done();
};
我可以通过添加销毁方法来修复它,但它仍然很慢并且会挂起几秒钟。有没有更好的方法来结束/销毁 Transform 流?
ValidateCSV.prototype.destroy = function () {
this.readable = false;
this.writable = false;
this.emit('end');
};
编辑:
这是我如何使用 busboy 的流:
function processMultipart(req, res) {
const userId = req.query._userId;
const busboy = new Busboy({ headers: req.headers, limits: { files: 1 } });
const updateId = req.params.id;
// Transform stream to validate the csv
const validateCSV = new ValidateCSV();
validateCSV
.on('finish', () => {
// Process the csv
})
.on('error', (er) => {
//Do some logging
res.status(500).json(er).end();
});
// Multipart upload handler
busboy
.on('file', (fieldname, file, filename) => {
dataset.name = fieldname.length > 0 ?
fieldname : filename.substr(0, filename.indexOf('.csv'));
file
.on('error', (er) => {
//Send Error
})
.on('end', () => {
// Save dataset to mongo
if (dataset._update) {
res.status(200).json(dataset).end();
} else {
Dataset.create(dataset, (er) => {
if (er) {
res.status(500).json(er).end();
} else {
res.status(200).json(dataset).end();
}
});
}
}).pipe(validateCSV);
});
req.pipe(busboy);
}