3

I am stuck on an issue where I need to create and download a zip of multiple files using NodeJs. Things I have tried and failed :

https://github.com/archiverjs/node-archiver/issues/364#issuecomment-573508251

https://github.com/kubernetes/kubernetes/issues/90483#issue-606722433

unexpected behavior using zip-stream NPM on Google k8s

In addition to this, now the files are encrypted as well so I have to decrypt them before adding them to zip on the fly.

Though I solved this also, my solution works perfectly while the server is running on a local machine but failed when on the Google Kubernetes Engine.

After some more research, I guess this might be because of a backpressure issue in the streams in NodeJs but as described in docs, backpressure is handled by the pipe method automatically. Is it possible that the receiving speed of the browser is not matching with the sending speed of my server/zipping if yes how to solve this problem?

All the samples related to the problem are in the links provided above.

In Addition to this readable stream is passed through decipher to decrypt it.

    const handleEntries = ({ elem, uniqueFiles, archive, speedLimit }) => {
  return new Promise((resolve, reject) => {
    let fileName = elem.fileName;
    const url = elem.url;
    const decipher = elem.decipher;
    // changing fileName if same filename is already added to zip
    if (uniqueFiles[fileName] || uniqueFiles[fileName] === 0) {
      uniqueFiles[fileName]++;
    } else {
      uniqueFiles[fileName] = 0;
    }
    if (uniqueFiles[fileName]) {
      const lastDotIndex = fileName.lastIndexOf(".");
      const name = fileName.substring(0, lastDotIndex);
      const extension = fileName.substring(lastDotIndex + 1);
      fileName = `${name}(${uniqueFiles[fileName]}).${extension}`;
    }
    let readableStream = Request(url);
    // create a "Throttle" instance that reads at speedLimit bps
    if (speedLimit) {
      const throttle = new Throttle({ bps: Number(speedLimit) });
      readableStream = readableStream.pipe(throttle);
    }
    // if file is encrypted, need to decrypt it before piping to zip
    readableStream = decipher ? readableStream.pipe(decipher) : readableStream;
    archive.append(readableStream, { name: fileName });
    readableStream.on("complete", result => {
      console.log("Request stream event complete : ", fileName);
      resolve("done");
      // readableStream.unpipe();
      // readableStream.destroy();
    });
    readableStream
      .on("error", error => {
        console.log("Request stream event error fileName : ", fileName, " error : ", error);
        // readableStream.unpipe();
        // readableStream.destroy();
        resolve("done");
      })
      .on("pipe", result => {
        console.log("Request stream event pipe : ", fileName);
      })
      .on("request", result => {
        console.log("Request stream event request : ", fileName);
      })
      .on("response", result => {
        console.log("Request stream event response : ", fileName);
      })
      .on("socket", result => {
        result.setKeepAlive(true);
        console.log("Request stream event socket : ", fileName);
      });
  });
};

const useArchiver = async ({ resp, urls, speedLimit }) => {
  resp.writeHead(200, {
    "Content-Type": "application/zip",
    "Content-Disposition": `attachment; filename="${outputFileName}"`,
    "Access-Control-Allow-Origin": "*",
    "Access-Control-Allow-Methods": "GET, POST, OPTIONS"
  });
  const uniqueFiles = {};
  const archive = Archiver("zip", { zlib: 0 });
  archive.pipe(resp);
  archive
    .on("close", result => {
      console.log("archive stream event close : ", result);
      // archive.unpipe();
      // archive.destroy();
    })
    .on("drain", result => {
      console.log("archive stream event drain : ", result);
    })
    .on("entry", result => {
      console.log("archive stream event entry : ", result.stats);
    })
    .on("error", error => {
      console.log("archive stream event error : ", error);
      reject("error");
      // archive.unpipe();
      // archive.destroy();
    })
    .on("finish", result => {
      console.log("archive stream event finish : ", result);
      // archive.unpipe();
      // archive.destroy();
    })
    .on("pipe", result => {
      console.log("archive stream event pipe : ");
    })
    .on("progress", async result => {
      console.log("archive stream event progress : ", result.entries);
      if (urls.length === result.entries.total && urls.length === result.entries.processed) {
        await archive.finalize();
        console.log("finalized : ", urls[0]);
      }
    })
    .on("unpipe", result => {
      console.log("archive stream event unpipe : ");
    })
    .on("warning", result => {
      console.log("archive stream event warning : ", result);
    });
  for (const elem of urls) {
    await handleEntries({ elem, uniqueFiles, archive, speedLimit });
  }
};

I tried this code with archiver, getting drain event of archiver while zipping large files, does pipe handles back pressure or not if yes why I am getting drain event from archiver.

4

2 回答 2

0

嘿)我研究了你的代码,可以说你,你使用了一个承诺功能,你不会等到她完成。你需要用 await new Promise() 包装 zipStreamer.entry。而且一定是这样

async function doSmth() {
   const decipher = crypto.createDecipheriv(
      algorithm,
      Buffer.from(key), 
      Buffer.from(key.substring(0, 9)
   ));
   await new Promise((resolve, reject) => {
     zipStreamer.entry(readableStream.pipe(decipher), {
      name: fileName
     }, (error, result) => {
      if (!error) {
         resolve("done");
      } else {
        reject("error");
      }
     });  
   });
}
于 2020-11-14T14:14:48.550 回答
0

似乎我找到了解决方案,我对 Kubernetes 配置做了一些更改,即将超时时间从 30 秒增加到 300 秒,增加 CPU 限制,并针对高达 12-13 GB 的文件对其进行了多次测试,它就像一个魅力。 我认为将 CPU 从 0.5 增加到 1 并增加超时对我来说是一项工作

于 2020-11-20T15:17:00.230 回答