8

I have been trying to use a readable and a transform stream to process a very large file. The problem that I seem to come across is that if I don't put a writable stream at the end, the program seems to terminate before the result gets returned.

Example : rstream.pipe(split()).pipe(tstream)

My tstream has an emitter that emits when a counter hits a threshold. When that threshold is set to a low number, I get a result, but when it's high, it's not returning anything. If I pipe it to a file writer, it always returns a result. Am I missing something obvious?

code:

// Dependencies
var fs = require('fs');
var rstream = fs.createReadStream('file');
var wstream = fs.createWriteStream('output');
var split = require('split'); // used for separating stream by new line
var QTransformStream = require('./transform');

var qtransformstream = new QTransformStream();
qtransformstream.on('completed', function(result) {
    console.log('Result: ' + result);
});
exports.getQ = function getQ(filename, callback) {

    // THIS WORKS if i have a low counter for qtransformstream, 
    // but when it's high, I do not get a result
    //   rstream.pipe(split()).pipe(qtransformstream);

    // this always works
    rstream.pipe(split()).pipe(qtransformstream).pipe(wstream);

};

Here is the code for the Qtransformstream

// Dependencies
var Transform = require('stream').Transform,
    util = require('util');
// Constructor, takes in the Quser as an input
var TransformStream = function(Quser) {
    // Create this as a Transform Stream
    Transform.call(this, {
        objectMode: true
    });
    // Default the Qbase to 32 as an assumption
    this.Qbase = 32;
    if (Quser) {
        this.Quser = Quser;
    } else {
        this.Quser = 20;
    }
    this.Qpass = this.Quser + this.Qbase;
    this.Counter = 0;
    // Variables used as intermediates
    this.Qmin = 120;
    this.Qmax = 0;
};
// Extend the transform object
util.inherits(TransformStream, Transform);
// The Transformation to get the Qbase and Qpass
TransformStream.prototype._transform = function(chunk, encoding, callback) {
    var Qmin = this.Qmin;
    var Qmax = this.Qmax;
    var Qbase = this.Qbase;
    var Quser = this.Quser;
    this.Counter++;
    // Stop the stream after 100 reads and emit the data
    if (this.Counter === 100) {
        this.emit('completed', this.Qbase, this.Quser);
    }
    // do some calcs on this.Qbase

    this.push('something not important');
    callback();
};
// export the object
module.exports = TransformStream;
4

3 回答 3

8

编辑:

另外,我不知道您的计数器有多高,但是如果您填满缓冲区,它将停止将数据传递给转换流,在这种情况下completed,实际上永远不会被命中,因为您永远不会达到计数器限制。尝试更改您的highwatermark.

编辑2:更好的解释

众所周知,atransform stream 是双工流,这基本上意味着它可以接受来自源的数据,并且可以将数据发送到目的地。这通常分别称为读和写。transform stream继承自Node.jsread streamwrite stream由 Node.js 实现。但是有一个警告,transform stream 不必实现 _read 或 _write 函数。从这个意义上说,您可以将其视为鲜为人知的直通流

如果您考虑transform stream实现的事实,write stream您还必须考虑写入流始终具有转储其内容的目的地这一事实。您遇到的问题是,当您创建 a 时,transform stream您无法指定发送内容的位置。 将数据完全通过转换流传递的唯一方法是将其通过管道传输到写入流,否则,实质上您的流会被备份并且无法接受更多数据,因为数据没有地方可去。

这就是为什么当您通过管道传输到写入流时,它总是可以工作。写入流通过将数据发送到目的地来减轻数据备份,因此您的所有数据都将通过管道传输并发出完成事件。

当样本量较小时,您的代码在没有写入流的情况下工作的原因是您没有填充流,因此转换流可以接受足够的数据以允许达到完整的事件/阈值。随着阈值的增加,您的流可以在不将其发送到另一个地方(写入流)的情况下接受的数据量保持不变。这会导致您的流得到备份,并且它不能再接受数据,这意味着完成的事件将永远不会被发出。

我敢说,如果你增加你highwatermark的转换流,你将能够增加你的阈值并且仍然有代码工作。这种方法虽然是不正确的。将您的流通过管道传输到将数据发送到 dev/null 的写入流,创建该写入流的方法是:

var writer = fs.createWriteStream('/dev/null');

Node.js 文档中关于缓冲的部分解释了您遇到的错误。

于 2015-08-28T03:41:03.543 回答
1

我建议使用Writable而不是 Transform 流。然后重命名_transform_write,如果您通过管道传输到该流,您的代码将使用该流。正如@Bradgnar 已经指出的那样,转换流需要消费者,否则它将阻止可读流将更多数据推送到其缓冲区。

于 2015-09-03T17:48:11.040 回答
1

您不会中断 _transform 并且进程会走得很远。尝试:

this.emit('completed', ...);
this.end();

这就是为什么“程序似乎在返回结果之前终止”的原因

并且不要输出任何无用的数据:

var wstream = fs.createWriteStream('/dev/null');

祝你好运)

于 2015-08-31T13:51:45.817 回答