2

我正在玩流和异步/等待功能。到目前为止,我所拥有的是:

let logRecord = ((record, callback) => {
     console.log(record);
     return callback();
});

let importCSVfromPath = async((csv_path) => {
    return new Promise(function(resolve, reject) {
        var parser = parse();
        var input = fs.createReadStream(csv_path);
        var transformer = transform(logRecord, {parallel: 1});

        input.on('error', (err) => {
            reject(err);
        });
        input.on('finish', ()=> {
            resolve();
        });

        input.pipe(parser).pipe(transformer);
    });
});

现在我想用 importRecord 替换 logRecord。问题是这个函数必须使用已经是异步堆栈一部分的函数。

let importRecord = async( (record) => {
    .......
    await(insertRow(row));
});

这样做的正确方法是什么?

4

1 回答 1

1

它比这稍微复杂一些 - node.js 流不适应(至少还没有)到 es7 async/await方法。

如果您想自己开发,请考虑编写一个派生自Readable stream的类。实现基于 Promise 的接口是一项艰巨的任务,但它是可能的。

如果您对使用许可的许可框架感到满意,请查看Scramjet。有了它,您的代码将如下所示(大部分示例是解析 CSV - 我将在下一个版本中添加一个帮助程序):

fs.createReadStream("file.csv")        // open your file
    .pipe(new StringStream())          // pass to scramjet
    .split("\n")                       // split by line
    .parse((line) => line.split(","))  // convert lines to arrays
    .map(async (line) => {             // run asynchrounous mapping
        await importRecord(line);      // import log to DB
        return logRecord(line);        // return some log for the output
    })
    .pipe(process.stdout);             // pipe the output wherever you like

我相信这正是您正在寻找的东西,它将并行运行您的记录导入,同时保持输出顺序。

于 2017-11-06T21:18:59.240 回答