1

我想要做的是从 S3 读取文件 - 更新一些信息 - 将其上传回来,全部使用流,而无需在服务器上创建文件的副本。我使用事件流库来解析文件,如下所示:(已更新解决方案!)

let params = {
    Bucket: Bucket,
    Key: Key,
};
let rStream = s3.getObject(params).createReadStream();
let updatedFile = fs.createWriteStream('/tmp/updated-' + Key);

return new Promise((resolve, reject) => {
    let s = rStream
        .pipe(es.split())
        .pipe(es.mapSync(function(data) {
            if(!data) return;

            s.pause();
            let line = data.split(',');

            if(line[1]==='xyz'){
                line[1] = 'xyz11';
            }

            updatedFile.write(line.join(','));
            updatedFile.write('\n');

            s.resume();
        })
        .on('error', function(err) {
            reject(err);
        })
        .on('end', function() {
           updatedFile.end();

           //createReadStream from path of updatedFile
           //s3 upload file logic
           //delete tmp file logic
           resolve(true);
        }));
});

我的问题是,当我返回这个流时,它关闭并且可读:false,所以我不能使用它:

const updatedStream = fs.createReadStream(tmpfilePath); 
            params={
                Bucket: Bucket,
                Key: Key,
                Body: updatedStream
            };
await s3.upload(params)
  .on('httpUploadProgress', (progress) => {
       console.log('progress', progress)
  })
  .send();

try {
   fs.unlink(tmpfilePath, function (err) {
     if (err) throw err;
       console.log("Tmp File deleted successfully.");
      });
} catch(err) {
   console.log("Warning: Unable to delete the tmp file.", err);
}

有什么想法吗?事件流最终关闭了流,这就是为什么当我将其传回时它的可读性:假。

如何创建读取流、更改数据并使流可读以便将其传递给 s3.upload 函数?

解决方案

最后我设法得到这个工作。

我创建了一个 writeStream 临时文件,每次迭代后我都会在其中写入。当流结束时,我关闭 writeStream 以关闭我的临时文件。

然后我将它上传回 S3(从我的临时文件创建一个 readStream)。

上传完成后,我会删除我的临时文件。

4

0 回答 0