15

前提

我正在尝试找到正确的方法来过早终止 Node.js 中的一系列管道流(管道):有时我想在流完成之前优雅地中止它。具体来说,我正在处理主要objectMode: true和非本地并行流,但这并不重要。

问题

问题是当我unpipe使用管道时,数据保留在每个流的缓冲区中并被drain编辑。这对于大多数中间流(例如/ )来说可能没问题,但最后一个流仍然会流向其写入目标(例如文件或数据库或套接字或 w/e)。如果缓冲区包含数百或数千个需要大量时间来耗尽的块,这可能会出现问题。我希望它立即停止,即不流失;为什么要在无关紧要的数据上浪费周期和内存?ReadableTransformWritable

根据我走的路线,我收到“结束后写入”错误,或者当流找不到现有管道时出现异常。

问题

优雅地终止表单中的流管道的正确方法是什么a.pipe(b).pipe(c).pipe(z)

解决方案?

我想出的解决方案是 3 步:

  1. unpipe管道中的每个流以相反的顺序
  2. 清空实现的每个流的缓冲区Writable
  3. end实现的每个流Writable

一些说明整个过程的伪代码:

var pipeline = [ // define the pipeline
  readStream,
  transformStream0,
  transformStream1,
  writeStream
];

// build and start the pipeline
var tmpBuildStream;
pipeline.forEach(function(stream) {
    if ( !tmpBuildStream ) {
        tmpBuildStream = stream;
        continue;
    }
    tmpBuildStream = lastStream.pipe(stream);
});

// sleep, timeout, event, etc...

// tear down the pipeline
var tmpTearStream;
pipeline.slice(0).reverse().forEach(function(stream) {
    if ( !tmpTearStream ) {
        tmpTearStream = stream;
        continue;
    }
    tmpTearStream = stream.unpipe(tmpTearStream);
});

// empty and end the pipeline
pipeline.forEach(function(stream) {
  if ( typeof stream._writableState === 'object' ) { // empty
    stream._writableState.length -= stream._writableState.buffer.length;
    stream._writableState.buffer = [];
  }
  if ( typeof stream.end === 'function' ) { // kill
    stream.end();
  }
});

我真的很担心stream._writableState内部bufferlength属性的使用和修改(_表示私有属性)。这似乎是一个黑客。另请注意,由于我是管道,pause因此resume我们不可能(基于我从 IRC 收到的建议)。

我还整理了一个可以从 github 获取的可运行版本(相当草率):https ://github.com/zamnuts/multipipe-proto (git clone,npm install,查看自述文件,npm start)

4

1 回答 1

2

在这种特殊情况下,我认为我们应该摆脱具有 4 个不同的未完全定制的流的结构。如果我们没有实现自己的机制,将它们连接在一起将产生难以控制的链依赖。

我想在这里关注您的实际目标:

 INPUT >----[read] → [transform0] → [transform1] → [write]-----> OUTPUT
               |          |              |            |
 KILL_ALL------o----------o--------------o------------o--------[nothing to drain]

我相信上面的结构可以通过结合自定义来实现:

  1. duplex stream - 对于自己_write(chunk, encoding, cb)_read(bytes)实施

  2. transform stream- 用于自己的_transform(chunk, encoding, cb)实施。

由于您正在使用该writable-stream-parallel软件包,您可能还想查看它们的库,因为它们的duplex实现可以在这里找到:https ://github.com/Clever/writable-stream-parallel/blob/master/lib/duplex.js 。他们的transform stream实现在这里:https ://github.com/Clever/writable-stream-parallel/blob/master/lib/transform.js 。他们在这里处理highWaterMark

可能的解决方案

他们的写入流:https ://github.com/Clever/writable-stream-parallel/blob/master/lib/writable.js#L189有一个有趣的功能writeOrBuffer,我想你可以稍微调整一下以中断写入缓冲区中的数据。

注意:这3 个标志控制缓冲区清除:

( !finished && !state.bufferProcessing && state.buffer.length )

参考:

于 2015-03-18T12:34:56.350 回答