129

我目前正在使用一个名为s3-upload-stream的 node.js 插件将非常大的文件流式传输到 Amazon S3。它使用多部分 API,并且在大多数情况下运行良好。

但是,这个模块已经过时了,我已经不得不对其进行修改(作者也已弃用它)。今天又遇到了亚马逊的一个问题,很想采纳作者的建议,开始使用官方的aws-sdk来完成我的上传。

但。

官方 SDK 似乎不支持管道到s3.upload(). s3.upload 的本质是您必须将可读流作为参数传递给 S3 构造函数。

我有大约 120 多个用户代码模块,它们执行各种文件处理,并且它们不知道其输出的最终目的地。引擎交给他们一个可管道的可写输出流,然后他们通过管道传递给它。我不能给他们一个AWS.S3对象并要求他们upload()在不向所有模块添加代码的情况下调用它。我使用的原因s3-upload-stream是因为它支持管道。

有没有办法让 aws-sdks3.upload()我可以通过管道将流传输到?

4

13 回答 13

164

upload()使用 node.jsstream.PassThrough()流包装 S3函数。

这是一个例子:

inputStream
  .pipe(uploadFromStream(s3));

function uploadFromStream(s3) {
  var pass = new stream.PassThrough();

  var params = {Bucket: BUCKET, Key: KEY, Body: pass};
  s3.upload(params, function(err, data) {
    console.log(err, data);
  });

  return pass;
}
于 2016-05-21T17:31:15.740 回答
142

答案有点晚,希望它可能对其他人有所帮助。您可以返回可写流和承诺,因此您可以在上传完成时获取响应数据。

const AWS = require('aws-sdk');
const stream = require('stream');

const uploadStream = ({ Bucket, Key }) => {
  const s3 = new AWS.S3();
  const pass = new stream.PassThrough();
  return {
    writeStream: pass,
    promise: s3.upload({ Bucket, Key, Body: pass }).promise(),
  };
}

您可以按如下方式使用该功能:

const { writeStream, promise } = uploadStream({Bucket: 'yourbucket', Key: 'yourfile.mp4'});
const readStream = fs.createReadStream('/path/to/yourfile.mp4');

const pipeline = readStream.pipe(writeStream);

现在您可以检查承诺:

promise.then(() => {
  console.log('upload completed successfully');
}).catch((err) => {
  console.log('upload failed.', err.message);
});

或使用异步/等待:

try {
    await promise;
    console.log('upload completed successfully');
} catch (error) {
    console.log('upload failed.', error.message);
}

或者作为stream.pipe()返回 stream.Writable,目的地(上面的 writeStream 变量),允许管道链,我们也可以使用它的事件:

 pipeline.on('close', () => {
   console.log('upload successful');
 });
 pipeline.on('error', (err) => {
   console.log('upload failed', err.message)
 });
于 2018-05-11T11:29:35.933 回答
58

在接受的答案中,函数在上传完成之前结束,因此它是不正确的。下面的代码从可读流中正确管道。

上传参考

async function uploadReadableStream(stream) {
  const params = {Bucket: bucket, Key: key, Body: stream};
  return s3.upload(params).promise();
}

async function upload() {
  const readable = getSomeReadableStream();
  const results = await uploadReadableStream(readable);
  console.log('upload complete', results);
}

您还可以更进一步并使用以下方式输出进度信息ManagedUpload

const manager = s3.upload(params);
manager.on('httpUploadProgress', (progress) => {
  console.log('progress', progress) // { loaded: 4915, total: 192915, part: 1, key: 'foo.jpg' }
});

托管上传参考

可用事件列表

于 2017-11-13T01:09:10.987 回答
10

没有一个答案对我有用,因为我想:

  • 管道进入s3.upload()
  • 将结果通过管道s3.upload()传输到另一个流中

接受的答案不做后者。其他的依赖promise api,在使用流管道时工作起来很麻烦。

这是我对已接受答案的修改。

const s3 = new S3();

function writeToS3({Key, Bucket}) {
  const Body = new stream.PassThrough();

  s3.upload({
    Body,
    Key,
    Bucket: process.env.adpBucket
  })
   .on('httpUploadProgress', progress => {
       console.log('progress', progress);
   })
   .send((err, data) => {
     if (err) {
       Body.destroy(err);
     } else {
       console.log(`File uploaded and available at ${data.Location}`);
       Body.destroy();
     }
  });

  return Body;
}

const pipeline = myReadableStream.pipe(writeToS3({Key, Bucket});

pipeline.on('close', () => {
  // upload finished, do something else
})
pipeline.on('error', () => {
  // upload wasn't successful. Handle it
})

于 2019-07-29T08:45:43.160 回答
7

类型脚本解决方案:
此示例使用:

import * as AWS from "aws-sdk";
import * as fsExtra from "fs-extra";
import * as zlib from "zlib";
import * as stream from "stream";

和异步功能:

public async saveFile(filePath: string, s3Bucket: AWS.S3, key: string, bucketName: string): Promise<boolean> { 

         const uploadStream = (S3: AWS.S3, Bucket: string, Key: string) => {
            const passT = new stream.PassThrough();
            return {
              writeStream: passT,
              promise: S3.upload({ Bucket, Key, Body: passT }).promise(),
            };
          };
        const { writeStream, promise } = uploadStream(s3Bucket, bucketName, key);
        fsExtra.createReadStream(filePath).pipe(writeStream);     //  NOTE: Addition You can compress to zip by  .pipe(zlib.createGzip()).pipe(writeStream)
        let output = true;
        await promise.catch((reason)=> { output = false; console.log(reason);});
        return output;
}

在某处调用此方法,例如:

let result = await saveFileToS3(testFilePath, someS3Bucket, someKey, someBucketName);
于 2018-07-25T14:13:08.887 回答
6

在上面最接受的答案中要注意的是:如果您使用管道,则需要在函数中返回传递,

fs.createReadStream(<filePath>).pipe(anyUploadFunction())

function anyUploadFunction () { 
 let pass = new stream.PassThrough();
 return pass // <- Returning this pass is important for the stream to understand where it needs to write to.
}

否则它将默默地进入下一个而不抛出错误,或者会抛出一个错误,TypeError: dest.on is not a function具体取决于您编写函数的方式

于 2019-10-01T06:28:44.220 回答
5

遵循其他答案并使用适用于 Node.js 的最新 AWS 开发工具包,由于 s3 upload() 函数使用 await 语法和 S3 的承诺接受流,因此有一个更清洁、更简单的解决方案:

var model = await s3Client.upload({
    Bucket : bucket,
    Key : key,
    ContentType : yourContentType,
    Body : fs.createReadStream(path-to-file)
}).promise();
于 2020-09-14T17:46:13.853 回答
4

对于那些抱怨当他们使用 s3 api 上传功能并且零字节文件最终在 s3 上(@Radar155 和@gabo)的人 - 我也遇到了这个问题。

创建第二个 PassThrough 流并将所有数据从第一个传递到第二个,并将对该第二个的引用传递给 s3。您可以通过几种不同的方式做到这一点 - 可能一种肮脏的方式是在第一个流上监听“数据”事件,然后将相同的数据写入第二个流 - 与“结束”事件类似 - 只需调用第二个流上的结束函数。我不知道这是否是 aws api 中的错误、节点版本或其他问题 - 但它对我来说解决了这个问题。

以下是它的外观:

var PassThroughStream = require('stream').PassThrough;
var srcStream = new PassThroughStream();

var rstream = fs.createReadStream('Learning/stocktest.json');
var sameStream = rstream.pipe(srcStream);
// interesting note: (srcStream == sameStream) at this point
var destStream = new PassThroughStream();
// call your s3.upload function here - passing in the destStream as the Body parameter
srcStream.on('data', function (chunk) {
    destStream.write(chunk);
});

srcStream.on('end', function () {
    dataStream.end();
});
于 2019-01-11T20:23:01.757 回答
3

如果它可以帮助任何人,我能够成功地从客户端流式传输到 s3:

https://gist.github.com/mattlockyer/532291b6194f6d9ca40cb82564db9d2a

服务器端代码假定req是一个流对象,在我的情况下,它是从客户端发送的,并在标头中设置了文件信息。

const fileUploadStream = (req, res) => {
  //get "body" args from header
  const { id, fn } = JSON.parse(req.get('body'));
  const Key = id + '/' + fn; //upload to s3 folder "id" with filename === fn
  const params = {
    Key,
    Bucket: bucketName, //set somewhere
    Body: req, //req is a stream
  };
  s3.upload(params, (err, data) => {
    if (err) {
      res.send('Error Uploading Data: ' + JSON.stringify(err) + '\n' + JSON.stringify(err.stack));
    } else {
      res.send(Key);
    }
  });
};

是的,它打破了惯例,但如果你看一下要点,它比我使用 multer、busboy 等发现的任何东西都要干净得多......

+1 实用主义,感谢@SalehenRahman 的帮助。

于 2017-04-25T20:17:32.097 回答
3

如果您使用的是 AWS 节点 SDK v3,则有用于上传流/blob/缓冲区的专用模块。

https://www.npmjs.com/package/@aws-sdk/lib-storage

于 2021-10-25T11:08:46.933 回答
2

我正在使用 KnexJS,但在使用他们的流 API 时遇到了问题。我终于修复了它,希望以下内容对某人有所帮助。

const knexStream = knex.select('*').from('my_table').stream();
const passThroughStream = new stream.PassThrough();

knexStream.on('data', (chunk) => passThroughStream.write(JSON.stringify(chunk) + '\n'));
knexStream.on('end', () => passThroughStream.end());

const uploadResult = await s3
  .upload({
    Bucket: 'my-bucket',
    Key: 'stream-test.txt',
    Body: passThroughStream
  })
  .promise();
于 2019-07-02T23:26:51.777 回答
0

创建一个new stream.PassThrough()pipe它的输入流,然后将传递实例传递给主体。

检查以下示例:

function upload(s3, inputStream) {
    const pass = new PassThrough();

    inputStream.pipe(pass);

    return s3.upload(
        {
            Bucket: 'bucket name',
            Key: 'unique file name',
            Body: pass,
        },
        {
            queueSize: 4, // default concurrency
        },
    ).promise()
        .then((data) => console.log(data))
        .catch((error) => console.error(error));
}

于 2021-12-04T14:54:36.427 回答
-5

如果你知道流的大小,你可以使用minio-js像这样上传流:

  s3Client.putObject('my-bucketname', 'my-objectname.ogg', stream, size, 'audio/ogg', function(e) {
    if (e) {
      return console.log(e)
    }
    console.log("Successfully uploaded the stream")
  })
于 2016-05-21T12:53:30.667 回答