我终于得到了这个工作。解决方案不是很漂亮,我实际上已经切换到另一个概念(在帖子末尾描述)。
为了缓冲传入的数据,直到“to”字段到达,我使用了@samcday的流缓冲区。当我获得目标字段时,我将可读流释放到为数据排列的管道中。
这是代码(省略了一些部分,但基本部分在那里)。
var streamBuffers = require('stream-buffers');
function postInboundMail(req, res, next) {
var busboy = new Busboy({ headers: req.headers});
//Sometimes the fields arrives after the files are streamed.
//We need the "to"-field before we are ready for the files
//Therefore the onField is wrapped in a promise which gets
//resolved when the to field arrives
var awaitEmailAddress = new Promise(function(resolve, reject) {
busboy.on('field', function(fieldname, val, fieldnameTruncated, valTruncated) {
var emailAddress;
if(fieldname === 'to') {
try {
emailAddress = emailRegexp.exec(val)[1]
resolve(emailAddress)
} catch(err) {
return reject(err);
}
} else {
return;
}
});
});
busboy.on('file', function(fieldname, file, filename, encoding, mimetype) {
var inbox;
//I'm using readableStreamBuffer to accumulate the data before
//I get the email field so I can send the stream through to S3
var readBuf = new streamBuffers.ReadableStreamBuffer();
//I have to pause readBuf immediately. Otherwise stream-buffers starts
//sending as soon as I put data in in with put().
readBuf.pause();
function getInbox(emailAddress) {
return model.inbox.findOne({email: emailAddress})
.then(function(result) {
if(!result) return Promise.reject(new Error(`Inbox not found for ${emailAddress}`))
inbox = result;
return Promise.resolve();
});
}
function saveFileStream() {
console.log('=========== starting stream to S3 ========= ' + filename)
//Have to resume readBuf since we paused it before
readBuf.resume();
//file.save will approximately do the following:
// readBuf.pipe(gzip).pipe(encrypt).pipe(S3)
return model.file.save({
inbox: inbox,
fileStream: readBuf
});
}
awaitEmailAddress.then(getInbox)
.then(saveFileStream)
.catch(function(err) {
log.error(err)
});
file.on('data', function(data) {
//Fill readBuf with data as it arrives
readBuf.put(data);
});
file.on('end', function() {
//This was the only way I found to get the S3 streaming finished.
//Destroysoon will let the pipes finish the reading bot no more writes are allowed
readBuf.destroySoon()
});
});
busboy.on('finish', function() {
res.writeHead(202, { Connection: 'close', Location: '/' });
res.end();
});
req.pipe(busboy);
}
即使我没有使用它,我也非常希望得到有关此解决方案的反馈。我有一种感觉,这可以做得更简单和优雅。
新解决方案:
我没有等待目标字段,而是将流直接发送到 S3。我想,在传入流和 S3 保存之间放入的内容越多,由于代码中的错误而丢失传入文件的风险就越高。(如果我没有回复 200,SendGrid 最终会重新发送文件,但这需要一些时间。)
我就是这样做的:
- 在数据库中保存文件的占位符
- 将流通过管道传输到 S3
- 在占位符到达时使用更多信息更新占位符
此解决方案还使我有机会轻松掌握不成功的上传,因为不成功上传的占位符将不完整。
//迈克尔