我有一个 NodeJS 流转换,它需要在输入结束时输出一些内部状态。
作为一个类比,考虑将输入数据拆分为行的转换。当传入的数据结束时,必须输出剩下的任何(未换行终止的)数据。
我怎样才能做到这一点?
(我试过 _flush,但是当我write(null)
进入转换时它没有被调用。)
更新:
var Transform = require('stream').Transform;
var util = require('util');
exports.createLinesTransform = createLinesTransform;
function createLinesTransform(options) {
return new LinesTransform(options);
}
function LinesTransform(options) {
options = options ? options : {};
options.objectMode = true;
Transform.call(this, options);
this._buf = '';
this._last_src = undefined;
}
util.inherits(LinesTransform, Transform);
LinesTransform.prototype._transform = function(chunk, encoding, done) {
console.log('chunk', chunk, '_buf', this._buf);
this._buf += chunk.payload;
for (var i = 0; i < this._buf.length; i++) {
if (this._buf.charAt(i) === '\n') {
this.push({src: chunk.src, payload: this._buf.slice(0, i)});
this._last_src = chunk.src;
this._buf = this._buf.slice(i + 1);
}
}
done();
}
// this doesn't get called when the input stream ends
LinesTransform.prototype._flush = function(done) {
console.log('_flush');
this.push({src: this._last_src, payload: this._buf});
done();
}
和一个测试:
it('should make a working LinesTransform', function() {
var lines = createLinesTransform();
var rxd = [];
lines.on('data', function(data) {
console.log('data', data);
rxd.push(data);
});
var ret = lines.write({src:{},payload:'hel'});
assert.deepEqual([], rxd);
ret = lines.write({src:{},payload:'lo'});
assert.deepEqual([], rxd);
lines.write({src:{},payload:' world!\na second'});
assert.deepEqual([{"src":{},"payload":"hello world!"}], rxd);
lines.write({src:{},payload:'line\n'});
assert.deepEqual([{"src":{},"payload":"hello world!"},
{"src":{},"payload":"a secondline"}],
rxd);
lines.write({src:{},payload:'and some trailing data'});
assert.deepEqual([{"src":{},"payload":"hello world!"},
{"src":{},"payload":"a secondline"}],
rxd);
lines.write(null);
lines.end();
// this last assert fails
assert.deepEqual([{"src":{},"payload":"hello world!"},
{"src":{},"payload":"a secondline"},
{"src":{},"payload":"and some trailing data"}],
rxd);
});