1

我试图了解如何在 fast-csv 中使用 Fiber 来制作逐行阅读器(单用户命令行脚本),该阅读器在每一行暂停读取/处理,直到该行完成各种异步调用. (不滚动我自己的 csv 代码,我想使用一些已经弄清楚关于 csv 格式的问题的东西)

如果我这样做

var csv = require("fast-csv");

var CSV_STRING = 'a,b\n' +
'a1,b1\n' +
'a2,b2\n';

csv
.fromString(CSV_STRING, {headers: false})
.on("record", function (data) {
    console.log("line="+JSON.stringify(data));
    setTimeout(function(){
        console.log("timeout");
    },2000);
})
.on("end", function () {
    console.log("done parsing CSV records");
});
console.log("done initializing csv parse");

我得到了我的期望:

done initializing csv parse
line=["a","b"]
line=["a1","b1"]
line=["a2","b2"]
done parsing CSV records
timeout
timeout
timeout

如果我尝试在每条记录后使用 Fiber

Fiber(
    function () {
        var fiber = Fiber.current;

        csv
            .fromString(CSV_STRING, {headers: false})
            .on("record", function (data) {
                console.log("line="+JSON.stringify(data));
                setTimeout(function(){
                    console.log("timeout");
                    fiber.run();
                },2000);
                Fiber.yield();
            })
            .on("end", function () {
                console.log("done parsing CSV records");
            });
        console.log("done initializing csv parse");
    }).run();

我明白了

done initializing csv parse
line=["a","b"]
events.js:141
      throw er; // Unhandled 'error' event
      ^

Error: yield() called with no fiber running

我想我明白发生了什么,Fiber().run() 中的代码完成了,所以它在调用 yield 之前离开了 Fiber,所以当它达到 yield 时不再有 Fiber。(因此聪明的错误消息“没有光纤运行”)

在我完成解析之前,让我保持光纤运行的合适方法是什么?

似乎是一个简单的问题,但我没有看到明显的答案?起初,我想在它离开 Future().run() 之前设置一个 yield,但这不起作用,因为第一个 fiber.run() 会让它再次离开 Fiber。

我想要的是流程是这样的:

done initializing csv parse
line=["a","b"]
timeout
line=["a1","b1"]
timeout
line=["a2","b2"]
timeout
done parsing CSV records

但如果不重新处理 fast-csv 的内部,这可能是不可能的,因为它控制了每个记录何时触发事件。我目前的想法是,必须让每个事件在 fast-csv 中被触发,并让处理 csv.on("record") 中的事件的用户将控制权交还给快速解析 csv 的循环-csv。

4

2 回答 2

0

节点:v5.4.0

好吧,这是获得这种行为的一种方法。我使用 es6 生成器逐行读取原始文件,然后使用 fast-csv 库上的生成器从逐行读取中解析原始字符串,这会导致非异步执行流程和类似于旧单的输出用户命令行脚本。

'use strict';
var csv = require("fast-csv");
var sfs = require('./sfs');

function parse(line) {
    csv
        .fromString(line, {headers: false})
        .on("record", function (data) {
            it.next(data);
        });
}

function *main() {
    // Make sure to initialize with a max buffer big enough to span any possible line length.  Otherwise undefined
    var fs = new sfs(it, 4096);
    var result=yield fs.open("data.csv");

    var line;

    while((line=yield fs.readLine()) != null) {
        console.log("line="+line);

        var csvData=yield parse(line);
        console.log("value1="+csvData[0]+" value2="+csvData[1]);
    }

    console.log("DONE");
}

var it = main();
it.next(); // get it all started

连同一个 quacky (quick and hacky) 类来包装我需要的 fs 东西。我确信有一种更好的方法来做我所做的事情,但它可以满足我的需要。

sfs.js

'use strict';
var fs=require('fs')

class sfs {
    constructor(it, maxbufsize) {
        this.MAX_BUF=maxbufsize;
        this.it=it;
        this.fd=null;
        this.lineBuf="";
        this.buffer=new Buffer(this.MAX_BUF);
        this.buflen=0;
    }

    open(file) {
        var parent=this;
        fs.open(file,'r',function(err,fd){
            parent.fd=fd;
            var parent2=parent;
            fs.fstat(fd,function(err, stats){
                parent2.stats=stats;
                parent2.it.next(stats);
            })
        })

    }

    readLine(){
        var parent = this;
        var i=0
        var s=this.stats.size
        var line="";
        var index=this.MAX_BUF-this.buflen;

        // read data into buffer, buffer may already have data from previous read that was shifted left over extracted line
        fs.read(this.fd,this.buffer,this.MAX_BUF-index,index,null,function(err,len,buf){
            var expectedReadLen=parent.MAX_BUF-parent.buflen;
            if(len < expectedReadLen) {  // If we didn't read enough to backfill buffer, lets make sure the string is terminated
                // as it shifts left so we don't try interpret older records to the right
                parent.buffer.fill(' ',parent.buflen+len,parent.MAX_BUF);
            }
            parent.buflen+=len; // whatever was in buffer has more now

            index=parent.buffer.indexOf('\n');

            if(index > -1) {
                line=parent.buffer.toString('utf8',0,index);
                buf.copy(parent.buffer,0,index+1,parent.buflen); // shift unused data left
                parent.buflen-=(index+1); // buffer left over after removing /n terminated line
                if(len<expectedReadLen) {  // If we didn't read enough to backfill buffer, lets make sure we erase old data
                    parent.buffer.fill(' ',parent.buflen,parent.MAX_BUF);
                }
            } else {
                if(parent.buflen > 0) {
                    line=parent.buffer.toString('utf8',0,parent.buflen);
                    parent.buflen=0;
                } else {
                    line=null;
                }
            }
            parent.it.next(line);
        });
    }

    close() {
        fs.close(this.fd);
    }
}

module.exports=sfs;
于 2016-01-28T23:46:18.720 回答
0

流是可暂停/可恢复的:

var csv = require("fast-csv");

var CSV_STRING = 'a,b\n' +
    'a1,b1\n' +
    'a2,b2\n';

var stream = csv.fromString(CSV_STRING, { headers: false })
    .on("data", function (data) {
        // pause the stream
        stream.pause();
        console.log("line: " + JSON.stringify(data));
        setTimeout(function () {
            // all async stuff are done, resume the stream
            stream.resume();
            console.log("timeout");
        }, 2000);
    }).on("end", function () {
        console.log("done parsing CSV records");
    });

控制台输出几乎正是您想要的:

/*
line: ["a","b"]
timeout
line: ["a1","b1"]
timeout
line: ["a2","b2"]
done parsing CSV records
timeout
*/

我能问一下为什么你绝对需要同步读取你的 csv 吗?

于 2016-01-29T08:51:18.300 回答