我有 3 个流(A、B、C),它们通过管道传输到另一个(A->B->C)。当我启动我的程序 B 的 _read get 时,立即调用它,因为它通过管道传输到 C。但是 B 流中还没有数据,因为 A 异步获取数据。一旦 B 接收到传递给 B 的 _write 方法的数据,它就会转换数据并发出一个“可读”事件(我手动触发 - 这是应该的方式吗?)。
然而,没有任何反应,任何人都不会使用来自 B 的数据(因此不会调用 B 的 _read)。我可以通过在我的 _write() 方法末尾调用(在 B 上)this._read() 来解决这个问题。但这也可能会将数据推送给消费者,尽管队列已满,对吧?
基本上我想将较大的数据块发送到 B 流中,将其拆分为较小的数据块,然后将它们一个一个地传递给 C。所以我想在B中有某种缓冲区。
_read(size) {
if(this._lineBuffer.length > 0) {
var stop = false;
while(!stop) {
stop = this.push(this._lineBuffer.splice(0,1));
}
}
if(this._pendingWriteAck) {
this._pendingWriteAck();
this._pendingWriteAck = null;
}
}
_write(chunk, encoding, callback) {
console.log("New chunk for line splitter received");
if(this._buffer) {
this._buffer = Buffer.concat([this._buffer, chunk]);
} else {
this._buffer = chunk;
}
for (; this._offset < this._buffer.length; this._offset++) {
if (this._buffer[this._offset] === 0x0a) { // 0x0a is a new line
this._lineBuffer.push(this._buffer.slice(0, this._offset).toString());
this._buffer = this._buffer.slice(this._offset + 1);
this._offset = 0;
}
}
if(this._lineBuffer.length > this._maxLineBuffer || this._buffer.length > this._maxRawBuffer) {
console.log("Line Split buffer has reached capacity. Waiting...");
this._pendingWriteAck = callback;
} else {
callback();
}
setImmediate(()=>{
this.emit('readable');
this._read();
})
}