I have been trying to use a readable and a transform stream to process a very large file. The problem that I seem to come across is that if I don't put a writable stream at the end, the program seems to terminate before the result gets returned.
Example : rstream.pipe(split()).pipe(tstream)
My tstream
has an emitter that emits when a counter hits a threshold. When that threshold is set to a low number, I get a result, but when it's high, it's not returning anything. If I pipe it to a file writer, it always returns a result. Am I missing something obvious?
code:
// Dependencies
var fs = require('fs');
var rstream = fs.createReadStream('file');
var wstream = fs.createWriteStream('output');
var split = require('split'); // used for separating stream by new line
var QTransformStream = require('./transform');
var qtransformstream = new QTransformStream();
qtransformstream.on('completed', function(result) {
console.log('Result: ' + result);
});
exports.getQ = function getQ(filename, callback) {
// THIS WORKS if i have a low counter for qtransformstream,
// but when it's high, I do not get a result
// rstream.pipe(split()).pipe(qtransformstream);
// this always works
rstream.pipe(split()).pipe(qtransformstream).pipe(wstream);
};
Here is the code for the Qtransformstream
// Dependencies
var Transform = require('stream').Transform,
util = require('util');
// Constructor, takes in the Quser as an input
var TransformStream = function(Quser) {
// Create this as a Transform Stream
Transform.call(this, {
objectMode: true
});
// Default the Qbase to 32 as an assumption
this.Qbase = 32;
if (Quser) {
this.Quser = Quser;
} else {
this.Quser = 20;
}
this.Qpass = this.Quser + this.Qbase;
this.Counter = 0;
// Variables used as intermediates
this.Qmin = 120;
this.Qmax = 0;
};
// Extend the transform object
util.inherits(TransformStream, Transform);
// The Transformation to get the Qbase and Qpass
TransformStream.prototype._transform = function(chunk, encoding, callback) {
var Qmin = this.Qmin;
var Qmax = this.Qmax;
var Qbase = this.Qbase;
var Quser = this.Quser;
this.Counter++;
// Stop the stream after 100 reads and emit the data
if (this.Counter === 100) {
this.emit('completed', this.Qbase, this.Quser);
}
// do some calcs on this.Qbase
this.push('something not important');
callback();
};
// export the object
module.exports = TransformStream;