0

我正在编写一个小型解析器来使用节点流(实际上是 io.js,但我认为这并不重要)来处理一些日志文件。

我正在按照文档中的示例进行 unshift来解析标题。我可以成功拆分缓冲区并获取标题,但是一旦我调用stream.unshift它,它似乎就会连接标题字符串和剩余的字符串。

在为这个问题设置一些示例代码时,我发现当我查看基于文件的流时会发生这种行为。每当我使用基于字符串的流时,即使文件具有与字符串完全相同的文本,问题也不会发生。

这是我的文本编辑器中打开了空白字符的文件的样子,(用于比较): 示例日志文件的文本编辑器输出

我需要一些帮助来理解为什么会这样。

var StringDecoder = require('string_decoder').StringDecoder;

// setup string based stream in fake_stream
    var Stream = require('stream');
    var fake_file = 'FILE_TYPE:SOME-HEADER-DATE\r\n'
                  + 'HEADER_END\r\n'
                  + '1234|logged data|1|2|3|4|5|some other logged data\x1E\r\n'
                  + '1235|logged data|1|2|3|4|5|some other logged data\x1E\r\n'
                  + '1236|logged data|1|2|3|4|5|some other logged data\x1E\r\n'
    var fake_stream = new Stream.Readable();
    fake_stream.push(new Buffer(fake_file, 'utf8'));
    fake_stream.push(null);


// setup file based stream in file_stream
// the file minimal_test_log.glf has the text shown above (with the control characters unescaped)
    var fs = require('fs');
    var file = 'C:\\Some\\Path\\To\\minimal_test_log.glf';
    var file_stream = fs.createReadStream(file);



// WHY AM I GETTING DIFFERENT RESULTS HERE?

    parseHeader(file_stream, function(err, header, stream) {
        console.log('processing file_stream: ' + header.length);
        // RESULTS:  processing file_stream: 184
        // this results in the both parts concatenated without the HEADER_END/r/n
    });

    parseHeader(fake_stream, function(err, header, stream) {
        console.log('processing fake_stream: ' + header.length);
        // RESULTS:  processing fake_stream: 28
        // these results are what i would expect, everything before HEADER_END
    });



// Slightly modified example found at https://iojs.org/api/stream.html#stream_readable_unshift_chunk

function parseHeader(stream, callback) {
    stream.on('error', callback);
    stream.on('readable', onReadable);

    var decoder = new StringDecoder('utf8');
    var header = '';

    function onReadable() {

        var chunk, buf, remaining;
        var header_boundary = /HEADER_END\r\n/g;

        while (null !== (chunk = stream.read())) {

            var str = decoder.write(chunk);

            if (str.match(header_boundary)) {

                var split = str.split(header_boundary);
                header += split.shift();

                remaining = split.join('');
                buf = new Buffer(remaining, 'utf8');

                if (buf.length) {
                    stream.unshift(buf);
                }

                // the header length is different starting at this point

                stream.removeListener('error', callback);
                stream.removeListener('readable', onReadable);

                callback(null, header, stream);

            } else {
                header += str;
            }
        }
    }
}

4

1 回答 1

0

所以添加一个计数器onReadable表明它被调用了两次。由于header声明的范围比 更广泛onReadable,因此它保留了所存储的任何内容header。第二次通过该onReadable函数, header_boundary将不匹配,并且 if 语句将子句短路else,将日志的其余部分附加到header.

我重读了有关该事件的文档readable,并了解到

一旦内部缓冲区被耗尽,readable当更多数据可用时将再次触发事件

我相信这就是我打电话时发生的事情stream.unshift(buf);readable每当我将数据添加回流中时,都会触发第二个事件unshift

readable对于基于“文件”的流和基于“字符串”的流,都会触发第二个事件。然而,他们之间的时间似乎有点不同。在基于“字符串”的流中,当第二个readable事件被触发时,回调已经执行。在基于“文件”的流中,直到readable事件触发并且额外的数据被附加到header变量之后才会执行回调。

我还没有完全弄清楚为什么流之间的时间不同,但它给了我足够的时间继续我的工作。

于 2015-03-09T16:05:54.417 回答