我正在尝试编写一种非常实用的方式。我们使用 Highland.js 来管理流处理,但是因为我是新手,我想我对如何处理这种独特的情况感到非常困惑。
这里的问题是文件流中的所有数据都不一致。文件中的第一行通常是标题,我们希望将其存储到内存中,然后压缩流中的所有行。
这是我的第一次尝试:
var _ = require('highland');
var fs = require('fs');
var stream = fs.createReadStream('./data/gigfile.txt');
var output = fs.createWriteStream('output.txt');
var headers = [];
var through = _.pipeline(
_.split(),
_.head(),
_.doto(function(col) {
headers = col.split(',');
return headers;
}),
......
_.splitBy(','),
_.zip(headers),
_.wrapCallback(process)
);
_(stream)
.pipe(through)
.pipe(output);
管道中的第一个命令是按行拆分文件。接下来抓取标题,doto 将其声明为全局变量。问题是流中的下几行不存在,因此进程被阻止......可能是因为它上面的 head() 命令。
我尝试了其他一些变体,但我觉得这个例子让你知道我需要去哪里。
对此的任何指导都会有所帮助——它还提出了一个问题,即如果我在每一行中都有不同的值,我如何才能在许多可变长度/复杂度的不同流操作中分裂流程流。
谢谢。
编辑:我产生了更好的结果,但我质疑它的效率——有没有办法可以优化它,所以每次运行时我都不检查是否记录了标题?这感觉还是很草率的。
var through = _.pipeline(
_.split(),
_.filter(function(row) {
// Filter out bogus values
if (! row || headers) {
return true;
}
headers = row.split(',');
return false;
}),
_.map(function(row) {
return row.split(',')
}),
_.batch(500),
_.compact(),
_.map(function(row) {
return JSON.stringify(row) + "\n";
})
);
_(stream)
.pipe(through)