0

我正在尝试通过加入 2 个 csv 输入流来生成输出文件,对于 csv 1 中的每条记录,我想为 csv 2 中的每条记录生成一个输出。

我在浏览任何类似解决方案的堆栈溢出时遇到了高地,并遇到了:

Highland.js 中的嵌套流操作

我试图将其调整为我自己的问题,到目前为止有这个:

    const debug = require('debug')('csvparse');
    const csv = require('fast-csv');
    const fs = require('fs');
    const args = process.argv;
    const h = require('highland');

    const typestream = h(fs.createReadStream(args[2]).pipe(csv({ headers: true, ignoreEmpty: true })));
    const postcodestream = h(fs.createReadStream(args[3]).pipe(csv({ headers: true, ignoreEmpty: true })));

    const pipeline = typestream.flatMap((type) => {
        debug(type);

        return postcodestream.flatMap((postcode) => {
            debug(postcode);

            return h([`${type.type}-${postcode.postcode}\n`]);
        });
    });

    pipeline.pipe(process.stdout);

使用以下示例输入 csv1:

type,
STREET,
ROAD,

csv2:

postcode,
3456
3446
1234

我期望输出

STREET-3456
STREET-3446
STREET-1234
ROAD-3456
ROAD-3446
ROAD-1234

但我只是得到:

STREET-3456
STREET-3446
STREET-1234

我可以从调试语句中看到我退出 ROAD 一次然后它就停止了。

4

1 回答 1

0

好的,我发现了我的问题,基本上我应该一直使用 through 进行 csv 解析而不是包装管道,我也应该在初始 flatMap 中创建 fs.createReadStream 而不是从变量中引用它(因为流将在初始迭代后完成)。

代码现在是:

#!/usr/bin/node
const debug = require('debug')('csvparse');
const csv = require('fast-csv');
const fs = require('fs');
const args = process.argv;
const h = require('highland');

const pipeline = h(fs.createReadStream(args[2]))
    .through(csv({ headers: true, ignoreEmpty: true }))
    .flatMap((type) => {
        return h(fs.createReadStream(args[3]))
            .through(csv({ headers: true, ignoreEmpty: true }))
            .map((postcode) => {
                return `${type.type}-${postcode.postcode}\n`;
            });
    });

pipeline.pipe(process.stdout);
于 2019-04-05T00:38:33.687 回答