使用可读流、转换流和可写流,我有一个奇怪的行为,我的 Writer.end() 在 _write 回调之前被调用。
在这里,我复制/粘贴了错误代码的示例:
'use strict';
var util = require('util');
var Readable = require('stream').Readable;
function Reader() {
Readable.call(this, { objectMode: true });
this.data = [{id: 1}];
}
util.inherits(Reader, Readable);
Reader.prototype._read = function () {
console.log('_read');
setTimeout(function() {
this.push(this.data.pop() || null);
}.bind(this), 1000);
};
var Transform = require('stream').Transform;
function StringifyStream(options) {
Transform.call(this, options);
this._writableState.objectMode = true;
this._readableState.objectMode = false;
}
util.inherits(StringifyStream, Transform);
StringifyStream.prototype._transform = function(chunk, encoding, cb) {
setTimeout(function() {
this.push(JSON.stringify(chunk));
cb();
}.bind(this), 100);
};
var Writable = require('stream').Writable;
function Writer() {
this.string = '';
Writable.call(this);
}
util.inherits(Writer, Writable);
Writer.prototype._write = function(chunk, encoding, callback) {
console.log('_write', arguments);
this.string += chunk;
setTimeout(function() {
console.log('write done')
callback();
}, 1000);
};
Writer.prototype.end = function(chunk, encoding, callback) {
console.log('end', arguments);
if(callback) callback();
};
var reader = new Reader();
var change = new Change();
var stringify = new StringifyStream();
var writer = new Writer();
reader.pipe(change).pipe(stringify).pipe(writer);
和输出:
_read
_read
_write { '0': <Buffer 7b 22 69 64 22 3a 31 2c 22 70 72 6f 75 74 22 3a 31 7d>,
'1': 'buffer',
'2': [Function] }
end {}
write done