0

所以我使用busboy作为我的中间件,在我的快递服务器中流式传输带有 CSV 文件的表单数据。这些 CSV 文件可以有不同数量的额外配置参数,因此我需要解析第一行以计算出在将管道初始化到csv-parser之前有多少参数。我的方法看起来像这样:

// HTML Form parser middleware for dealing with file uploads
router.post("*", (req: Request, res: Response, next: NextFunction) => {

    let busboy = new Busboy({ headers: req.headers });

    busboy.on("file", (fieldname, file, filename, encoding, mimetype) => {
        file.on("end", () => {
            console.log("File [" + fieldname + "] Finished");
        });

        // number of CSV parameters to be found by splitting first line
        let paramsLen: number;

        // first line varible. Outside data callback incase first line is split over multiple data chunks
        let firstLine = "";

        // line split regex. works from new line and EOF
        const lineSplitReg: RegExp = /[\n\Z]/;

        return new Promise((f, r) => {
          file.on("data", data => {
              console.log("File [" + fieldname + "] got " + data.length + " bytes");
              if (!paramsLen) {
                  let strChunk = data.toString();
                  if (lineSplitReg.test(strChunk)) {
                      firstLine += strChunk.split(lineSplitReg)[0];
                      paramsLen = firstLine.split(",").length;

                      // paramsLen now found! init pipe to csv writeable
                      f();

                  } else {
                      // long line. contiune reading in next data chunk
                      firstLine += strChunk;
                  }
              }
          });
        })
        .then(() => {
          let headers: string[] = [
              "id",
              "brand",
              "product",
              "serialNumber",
              "site",
              "area",
              "location",
              "longitude",
              "latitude",
          ];

          // add extra config headers once paramsLen has been discovered
          let cNum = 1;
          for (let i = headers.length; i < paramsLen; i = i + 2) {
              headers.push(`c${cNum}`);
              headers.push(`v${cNum}`);
              cNum++;
          }

          file.pipe(
              csv({
                headers,
              }),
          );
        })
    });

    busboy.on("finish", () => {
        console.log("Done parsing form!");
        if (!importingDevicesFromCsv) {
            fulfill();
        }
    });

    req.pipe(busboy);
})

问题是,在履行承诺时,文件可读流已经消耗了部分或全部文件数据,这意味着这些块永远不会传递给 csv 可读流。那么,鉴于我们可能必须事先读取多个数据块,我如何才能读取流数据但在建立到 csv 解析器的管道之前不使用它呢?

4

1 回答 1

0

我的解决方案是创建一个承诺,它包装一个读取数据但不使用它的转换流并将数据保存在一个数组中(包括释放回调)。当paramsLen发现转换对象实现了承诺时,管道被建立,最后在转换流中保留的数据被排出。见下文:

// HTML Form parser middleware for dealing with file uploads
router.post("*", (req: Request, res: Response, next: NextFunction) => {

    let busboy = new Busboy({ headers: req.headers });

    busboy.on("file", (fieldname, file, filename, encoding, mimetype) => {
        file.on("end", () => {
            console.log("File [" + fieldname + "] Finished");
        });

        file.on("data", data => {
            console.log("File [" + fieldname + "] got " + data.length + " bytes");
        });

        return new Promise((f, r) => {

          let ts: {
              dataArray: Array<[Buffer, Function]>;
              paramsLen: number;
              firstLine: string;
              lineSplitReg: RegExp;
              stream: Transform;
              drainDone: boolean;
              drain(): void;
          } = {
              dataArray: [],
              paramsLen: undefined,
              firstLine: "",
              lineSplitReg: /[\n\Z]/,
              drainDone: false,
              drain: () => {
                  ts.dataArray.forEach(x => {
                      x[1](null, x[0]);
                  });
                  ts.drainDone = true;
              },
              stream: new Transform({
                  transform: (data: Buffer, enc, callback: Function) => {
                      // if drain finished pass data straight through
                      if (ts.drainDone) {
                          return callback(null, data);
                      }

                      ts.dataArray.push([data, callback]);

                      if (!ts.paramsLen) {
                          let strChunk = data.toString();
                          if (ts.lineSplitReg.test(strChunk)) {
                              ts.firstLine += strChunk.split(ts.lineSplitReg)[0];
                              ts.paramsLen = ts.firstLine.split(",").length;
                              f(ts);
                          } else {
                              // long line. contiune reading in next data chunk
                              ts.firstLine += strChunk;
                          }
                      }
                  },
              }),
          };

          file.pipe(ts);
        })
        .then(ts => {
          let headers: string[] = [
              "id",
              "brand",
              "product",
              "serialNumber",
              "site",
              "area",
              "location",
              "longitude",
              "latitude",
          ];

          // add extra config headers once paramsLen has been discovered
          let cNum = 1;
          for (let i = headers.length; i < paramsLen; i = i + 2) {
              headers.push(`c${cNum}`);
              headers.push(`v${cNum}`);
              cNum++;
          }

          ts.stream.pipe(
            csv({
                headers,
            }),
          );

          // drain transform stream
          ts.drain();
        })
    });

    busboy.on("finish", () => {
        console.log("Done parsing form!");
        if (!importingDevicesFromCsv) {
            fulfill();
        }
    });

    req.pipe(busboy);
})
于 2020-06-18T04:15:20.917 回答