3

我正在使用 SendGrid 通过电子邮件接收文件。SendGrid 解析传入的电子邮件并将文件以多部分形式发送到我设置的端点。

我不想要本地磁盘上的文件,所以我将它们直接流式传输到 Amazon S3。这很完美。

但在我可以流式传输到 S3 之前,我需要获取目标邮件地址,以便我可以计算出正确的 s3 文件夹。这是在表单帖子中名为“to”的字段中发送的。不幸的是,这个字段有时会在文件到达之后到达,因此我需要一种方法来等待目标字段,然后才能准备好接收流。

我想我可以将 onField 包装在一个 Promise 中,然后从 onFile 中等待 to-field。但是当字段在文件之后到达时,这个概念似乎将其锁定。

我是摊位流和承诺的新手。如果有人能告诉我如何做到这一点,我将不胜感激。

这是无效的伪代码:

function sendGridUpload(req, res, next) { 
  var busboy = new Busboy({ headers: req.headers });

  var awaitEmailAddress = new Promise(function(resolve, reject) {
    busboy.on('field', function(fieldname, val, fieldnameTruncated, valTruncated) {
      if(fieldname === 'to') {
        resolve(val);
      } else {
        return;
      }
    });
  });


  busboy.on('file', function(fieldname, file, filename, encoding, mimetype) {

    function findInbox(emailAddress) {
      console.log('Got email address: ' + emailAddress);

      ..find the inbox and generate an s3Key
      return s3Key;
    }

    function saveFileStream(s3Key) {
      ..pipe the file directly to S3
    }

    awaitEmailAddress.then(findInbox)
    .then(saveFileStream)
    .catch(function(err) {
      log.error(err)
    });
  });

  req.pipe(busboy);
}
4

1 回答 1

1

我终于得到了这个工作。解决方案不是很漂亮,我实际上已经切换到另一个概念(在帖子末尾描述)。

为了缓冲传入的数据,直到“to”字段到达,我使用了@samcday的流缓冲区。当我获得目标字段时,我将可读流释放到为数据排列的管道中。

这是代码(省略了一些部分,但基本部分在那里)。

var streamBuffers = require('stream-buffers');

function postInboundMail(req, res, next) {
  var busboy = new Busboy({ headers: req.headers});

  //Sometimes the fields arrives after the files are streamed.
  //We need the "to"-field before we are ready for the files
  //Therefore the onField is wrapped in a promise which gets
  //resolved when the to field arrives
  var awaitEmailAddress = new Promise(function(resolve, reject) {
    busboy.on('field', function(fieldname, val, fieldnameTruncated, valTruncated) {
      var emailAddress;

      if(fieldname === 'to') {
        try {
          emailAddress = emailRegexp.exec(val)[1]
          resolve(emailAddress)
        } catch(err) {
          return reject(err);        
        }
      } else {
        return;
      }
    });
  });


  busboy.on('file', function(fieldname, file, filename, encoding, mimetype) {
    var inbox;

    //I'm using readableStreamBuffer to accumulate the data before
    //I get the email field so I can send the stream through to S3
    var readBuf = new streamBuffers.ReadableStreamBuffer();

    //I have to pause readBuf immediately. Otherwise stream-buffers starts
    //sending as soon as I put data in in with put().
    readBuf.pause();

    function getInbox(emailAddress) {
      return model.inbox.findOne({email: emailAddress})
      .then(function(result) {
        if(!result) return Promise.reject(new Error(`Inbox not found for ${emailAddress}`))

        inbox = result;
        return Promise.resolve();
      });
    }

    function saveFileStream() {
      console.log('=========== starting stream to S3 ========= ' + filename)

      //Have to resume readBuf since we paused it before
      readBuf.resume();

      //file.save will approximately do the following:
      // readBuf.pipe(gzip).pipe(encrypt).pipe(S3)
      return model.file.save({
        inbox: inbox,
        fileStream: readBuf
      });
    }

    awaitEmailAddress.then(getInbox)
    .then(saveFileStream)
    .catch(function(err) {
      log.error(err)
    });


    file.on('data', function(data) {
      //Fill readBuf with data as it arrives
      readBuf.put(data);
    });

    file.on('end', function() {
      //This was the only way I found to get the S3 streaming finished.
      //Destroysoon will let the pipes finish the reading bot no more writes are allowed
      readBuf.destroySoon()
    });
  });


  busboy.on('finish', function() {
    res.writeHead(202, { Connection: 'close', Location: '/' });
    res.end();
  });

  req.pipe(busboy);
}

即使我没有使用它,我也非常希望得到有关此解决方案的反馈。我有一种感觉,这可以做得更简单和优雅。

新解决方案: 我没有等待目标字段,而是将流直接发送到 S3。我想,在传入流和 S3 保存之间放入的内容越多,由于代码中的错误而丢失传入文件的风险就越高。(如果我没有回复 200,SendGrid 最终会重新发送文件,但这需要一些时间。)

我就是这样做的:

  1. 在数据库中保存文件的占位符
  2. 将流通过管道传输到 S3
  3. 在占位符到达时使用更多信息更新占位符

此解决方案还使我有机会轻松掌握不成功的上传,因为不成功上传的占位符将不完整。

//迈克尔

于 2015-10-19T06:07:43.443 回答