0

我对 Node 流很熟悉,但我在为抽象代码的最佳实践而苦苦挣扎,我在单个管道步骤中重用了很多代码。

这是我今天所写内容的精简版:

inputStream
.pipe(csv.parse({columns:true})
.pipe(csv.transform(function(row) {return transform(row); }))
.pipe(csv.stringify({header: true})
.pipe(outputStream);

实际工作发生在transform(). 唯一真正改变的是inputStream,transform()outputStream。就像我说的,这是我实际使用的精简版。我在每个管道步骤上都有很多错误处理和日志记录,这最终是我尝试抽象代码的原因。

我要写的是一个管道步骤,如下所示:

inputStream
.pipe(csvFunction(transform(row)))
.pipe(outputStream);

我正在努力理解的是如何将这些管道步骤转换为一个接受流并返回流的函数。我看过像 through2 这样的库,但我不确定这如何让我到达我想要去的地方。

4

2 回答 2

2

您可以PassThrough像这样使用该类:

var PassThrough = require('stream').PassThrough;

var csvStream = new PassThrough();
csvStream.on('pipe', function (source) {
  // undo piping of source
  source.unpipe(this);
  // build own pipe-line and store internally
  this.combinedStream =
    source.pipe(csv.parse({columns: true}))
      .pipe(csv.transform(function (row) {
        return transform(row);
      }))
      .pipe(csv.stringify({header: true}));
});

csvStream.pipe = function (dest, options) {
  // pipe internal combined stream to dest
  return this.combinedStream.pipe(dest, options);
};

inputStream
  .pipe(csvStream)
  .pipe(outputStream);
于 2017-02-20T20:57:54.370 回答
0

这就是我最终的结果。我使用through2库和csv 库的流 API来创建我正在寻找的管道函数。

var csv = require('csv');
    through = require('through2');

module.exports = function(transformFunc) {
    parser = csv.parse({columns:true, relax_column_count:true}),
    transformer = csv.transform(function(row) {
        return transformFunc(row);
    }),
    stringifier = csv.stringify({header: true});

    return through(function(chunk,enc,cb){
        var stream = this;

            parser.on('data', function(data){
                transformer.write(data);
            });

            transformer.on('data', function(data){
                stringifier.write(data);
            });

            stringifier.on('data', function(data){
                stream.push(data);
            });

            parser.write(chunk);

            parser.removeAllListeners('data');
            transformer.removeAllListeners('data');
            stringifier.removeAllListeners('data');
            cb();
    })
}

值得注意的是,我在最后删除了事件侦听器的部分,这是由于在我创建了太多事件侦听器的地方遇到了内存错误。我最初尝试通过监听事件来解决这个问题once,但这阻止了后续块被读取并传递到下一个管道步骤。

如果有人有反馈或其他想法,请告诉我。

于 2017-03-16T13:37:54.897 回答