0

我有一个 NodeJS 流转换,它需要在输入结束时输出一些内部状态。

作为一个类比,考虑将输入数据拆分为行的转换。当传入的数据结束时,必须输出剩下的任何(未换行终止的)数据。

我怎样才能做到这一点?

(我试过 _flush,但是当我write(null)进入转换时它没有被调用。)

更新:

var Transform = require('stream').Transform;
var util = require('util');

exports.createLinesTransform = createLinesTransform;

function createLinesTransform(options) {
  return new LinesTransform(options);
}

function LinesTransform(options) {
  options = options ? options : {};
  options.objectMode = true;
  Transform.call(this, options);
  this._buf = '';
  this._last_src = undefined;
}
util.inherits(LinesTransform, Transform);

LinesTransform.prototype._transform = function(chunk, encoding, done) {
  console.log('chunk', chunk, '_buf', this._buf);
  this._buf += chunk.payload;
  for (var i = 0; i < this._buf.length; i++) {
    if (this._buf.charAt(i) === '\n') {
      this.push({src: chunk.src, payload: this._buf.slice(0, i)});
      this._last_src = chunk.src;
      this._buf = this._buf.slice(i + 1);
    }
  }
  done();
}

// this doesn't get called when the input stream ends
LinesTransform.prototype._flush = function(done) {
  console.log('_flush');
  this.push({src: this._last_src, payload: this._buf});
  done();
}

和一个测试:

  it('should make a working LinesTransform', function() {
    var lines = createLinesTransform();
    var rxd = [];
    lines.on('data', function(data) {
      console.log('data', data);
      rxd.push(data);
    });

    var ret = lines.write({src:{},payload:'hel'});
    assert.deepEqual([], rxd);
    ret = lines.write({src:{},payload:'lo'});
    assert.deepEqual([], rxd);
    lines.write({src:{},payload:' world!\na second'});
    assert.deepEqual([{"src":{},"payload":"hello world!"}], rxd);
    lines.write({src:{},payload:'line\n'});
    assert.deepEqual([{"src":{},"payload":"hello world!"},
                      {"src":{},"payload":"a secondline"}],
                     rxd);
    lines.write({src:{},payload:'and some trailing data'});
    assert.deepEqual([{"src":{},"payload":"hello world!"},
                      {"src":{},"payload":"a secondline"}],
                     rxd);
    lines.write(null);
    lines.end();
    // this last assert fails
    assert.deepEqual([{"src":{},"payload":"hello world!"},
                      {"src":{},"payload":"a secondline"},
                      {"src":{},"payload":"and some trailing data"}],
                     rxd);
  });
4

1 回答 1

1

_flush是正确的做法,我需要在测试中添加一个短暂的延迟以使其正常工作。

于 2013-10-21T08:07:50.807 回答