2

我有 3 个流(A、B、C),它们通过管道传输到另一个(A->B->C)。当我启动我的程序 B 的 _read get 时,立即调用它,因为它通过管道传输到 C。但是 B 流中还没有数据,因为 A 异步获取数据。一旦 B 接收到传递给 B 的 _write 方法的数据,它就会转换数据并发出一个“可读”事件(我手动触发 - 这是应该的方式吗?)。

然而,没有任何反应,任何人都不会使用来自 B 的数据(因此不会调用 B 的 _read)。我可以通过在我的 _write() 方法末尾调用(在 B 上)this._read() 来解决这个问题。但这也可能会将数据推送给消费者,尽管队列已满,对吧?

基本上我想将较大的数据块发送到 B 流中,将其拆分为较小的数据块,然后将它们一个一个地传递给 C。所以我想在B中有某种缓冲区。

_read(size) {
    if(this._lineBuffer.length > 0) {
        var stop = false;
        while(!stop) {
            stop = this.push(this._lineBuffer.splice(0,1));
        }
    }
    if(this._pendingWriteAck) {
        this._pendingWriteAck();
        this._pendingWriteAck = null;
    }
}

_write(chunk, encoding, callback) {
    console.log("New chunk for line splitter received");
    if(this._buffer) {
        this._buffer = Buffer.concat([this._buffer, chunk]);
    } else {
        this._buffer = chunk;
    }
    for (; this._offset < this._buffer.length; this._offset++) {
        if (this._buffer[this._offset] === 0x0a) { // 0x0a is a new line
            this._lineBuffer.push(this._buffer.slice(0, this._offset).toString());
            this._buffer = this._buffer.slice(this._offset + 1);
            this._offset = 0;
        }
    }

    if(this._lineBuffer.length > this._maxLineBuffer || this._buffer.length > this._maxRawBuffer) {
        console.log("Line Split buffer has reached capacity. Waiting...");
        this._pendingWriteAck = callback;
    } else {
        callback();
    }

    setImmediate(()=>{
        this.emit('readable');
        this._read();
    })
}
4

1 回答 1

0

您可以将 Transform 流用于“B”流:

const Transform = require('stream').Transform;

const B = new Transform({
  transform(chunk, encoding, callback) {
    this._offset = this._offset || 0;
    this._buffer = this._buffer ? Buffer.concat([this._buffer, chunk]) : chunk
    for (; this._offset < this._buffer.length; this._offset++) {
      if (this._buffer[this._offset] === 0x0a) { // 0x0a is a new line
        if (this._offset) {
          this.push(this._buffer.slice(0, this._offset), encoding);
        }
        this._buffer = this._buffer.slice(this._offset + 1);
        this._offset = 0;
      }
    }
    callback()
  },
  flush(callback) {
    if (this._buffer && this._buffer.length) {
      this.push(this._buffer);
    }
    callback();
  }
});

您可以通过执行以下操作来查看它的工作原理:

let line = 0
B.on('data', (chunk) => process.stdout.write(`${++line}. ${chunk}\n`))
B.write(['Foo', 'Bar', 'Baz', 'Qux', 'Hello '].join('\n'))
B.write(['World', 'The End'].join('\n'))
B.end()

终端的输出将是:

1. Foo
2. Bar
3. Baz
4. Qux
5. Hello World
6. The End
于 2017-04-04T14:41:50.553 回答