1

使用可读流、转换流和可写流,我有一个奇怪的行为,我的 Writer.end() 在 _write 回调之前被调用。

在这里,我复制/粘贴了错误代码的示例:

'use strict';

var util = require('util');
var Readable = require('stream').Readable;
function Reader() {
  Readable.call(this, { objectMode: true });
  this.data = [{id: 1}];
}
util.inherits(Reader, Readable);
Reader.prototype._read = function () {
  console.log('_read');
  setTimeout(function() {
    this.push(this.data.pop() || null);
  }.bind(this), 1000);
};

var Transform = require('stream').Transform;
function StringifyStream(options) {
  Transform.call(this, options);
  this._writableState.objectMode = true;
  this._readableState.objectMode = false;
}
util.inherits(StringifyStream, Transform);

StringifyStream.prototype._transform = function(chunk, encoding, cb) {
  setTimeout(function()  {
    this.push(JSON.stringify(chunk));
    cb();
  }.bind(this), 100);
};

var Writable = require('stream').Writable;
function Writer() {
  this.string = '';
  Writable.call(this);
}
util.inherits(Writer, Writable);

Writer.prototype._write = function(chunk, encoding, callback) {
  console.log('_write', arguments);
  this.string += chunk;
  setTimeout(function() {
    console.log('write done')
    callback();
  }, 1000);
};

Writer.prototype.end = function(chunk, encoding, callback) {
  console.log('end', arguments);
  if(callback) callback();
};

var reader = new Reader();
var change = new Change();
var stringify = new StringifyStream();
var writer = new Writer();

reader.pipe(change).pipe(stringify).pipe(writer);

和输出:

_read
_read
_write { '0': <Buffer 7b 22 69 64 22 3a 31 2c 22 70 72 6f 75 74 22 3a 31 7d>,
  '1': 'buffer',
  '2': [Function] }
end {}
write done
4

0 回答 0