3

我正在尝试编写一种非常实用的方式。我们使用 Highland.js 来管理流处理,但是因为我是新手,我想我对如何处理这种独特的情况感到非常困惑。

这里的问题是文件流中的所有数据都不一致。文件中的第一行通常是标题,我们希望将其存储到内存中,然后压缩流中的所有行。

这是我的第一次尝试:

var _      = require('highland');
var fs     = require('fs');
var stream = fs.createReadStream('./data/gigfile.txt');
var output = fs.createWriteStream('output.txt');

var headers = [];

var through = _.pipeline(
    _.split(),
    _.head(),
    _.doto(function(col) {
        headers = col.split(',');
        return headers;
    }),

    ......

    _.splitBy(','),
    _.zip(headers),
    _.wrapCallback(process)
);

_(stream)
    .pipe(through)
    .pipe(output);

管道中的第一个命令是按行拆分文件。接下来抓取标题,doto 将其声明为全局变量。问题是流中的下几行不存在,因此进程被阻止......可能是因为它上面的 head() 命令。

我尝试了其他一些变体,但我觉得这个例子让你知道我需要去哪里。

对此的任何指导都会有所帮助——它还提出了一个问题,即如果我在每一行中都有不同的值,我如何才能在许多可变长度/复杂度的不同流操作中分裂流程流。

谢谢。

编辑:我产生了更好的结果,但我质疑它的效率——有没有办法可以优化它,所以每次运行时我都不检查是否记录了标题?这感觉还是很草率的。

var through = _.pipeline(
    _.split(),
    _.filter(function(row) {
        // Filter out bogus values
        if (! row || headers) {
            return true;
        }
        headers = row.split(',');
        return false;
    }),
    _.map(function(row) {
        return row.split(',')
    }),
    _.batch(500),
    _.compact(),
    _.map(function(row) {
        return JSON.stringify(row) + "\n";
    })
);

_(stream)
    .pipe(through)
4

1 回答 1

4

您可以使用Stream.observe()Stream.fork()来拆分流。

var _      = require('highland');
var fs     = require('fs');
var stream = fs.createReadStream('./data/gigfile.txt');
var output = fs.createWriteStream('output.txt');
var through = highland.pipeline(function(s) {
    var headerStream, headers;
    // setup a shared variable to store the headers
    headers = [];
    // setup the csv processing
    s = s
        // split input into lines
        .split()
        // remove empty lines
        .compact()
        // split lines into arrays
        .map(function(row) {
            return row.split(',');
        });
    // create a new stream to grab the header
    headerStream = s.observe();
    // pause the original stream
    s.pause();
    // setup processing of the non-header rows
    s = s
        // drop the header row
        .drop(1)
        // convert the rest of the rows to objects
        .map(function(row) {
            var obj = headers.reduce(function(obj, key, i) {
                obj[key] = row[i];
                return obj;
            }, {});
            return JSON.stringify(obj) + "\n";
        });
    // grab the first row from the header stream
    // save the headers and then resume the normal stream
    headerStream.head().toArray(function(rows) {
        headers = rows[0];
        s.resume();
    });
    return s;
});
_(stream)
    .pipe(through)
    .pipe(output);

话虽如此,您的 csv 解析并未考虑在您的值中转义换行符和逗号。通常,这是在 csv 文件中通过将值括在双引号中来完成的。然后通过将两个并排放置来转义双引号。要做到这一点有点棘手,所以我建议使用处理它的包,例如fast-csv

然后您的代码可能如下所示:

var _      = require('highland');
var fs     = require('fs');
var csv    = require('fast-csv');
var stream = fs.createReadStream('./data/gigfile.txt');
var output = fs.createWriteStream('output.txt');

_(stream.pipe(csv({headers: true, ignoreEmpty: true})))
    .map(function(row) {
        return JSON.stringify(row) + "\n";
    })
    .pipe(output);
于 2015-07-01T15:46:51.873 回答