前提
我正在尝试找到正确的方法来过早终止 Node.js 中的一系列管道流(管道):有时我想在流完成之前优雅地中止它。具体来说,我正在处理主要objectMode: true
和非本地并行流,但这并不重要。
问题
问题是当我unpipe
使用管道时,数据保留在每个流的缓冲区中并被drain
编辑。这对于大多数中间流(例如/ )来说可能没问题,但最后一个流仍然会流向其写入目标(例如文件或数据库或套接字或 w/e)。如果缓冲区包含数百或数千个需要大量时间来耗尽的块,这可能会出现问题。我希望它立即停止,即不流失;为什么要在无关紧要的数据上浪费周期和内存?Readable
Transform
Writable
根据我走的路线,我收到“结束后写入”错误,或者当流找不到现有管道时出现异常。
问题
优雅地终止表单中的流管道的正确方法是什么a.pipe(b).pipe(c).pipe(z)
?
解决方案?
我想出的解决方案是 3 步:
unpipe
管道中的每个流以相反的顺序- 清空实现的每个流的缓冲区
Writable
end
实现的每个流Writable
一些说明整个过程的伪代码:
var pipeline = [ // define the pipeline
readStream,
transformStream0,
transformStream1,
writeStream
];
// build and start the pipeline
var tmpBuildStream;
pipeline.forEach(function(stream) {
if ( !tmpBuildStream ) {
tmpBuildStream = stream;
continue;
}
tmpBuildStream = lastStream.pipe(stream);
});
// sleep, timeout, event, etc...
// tear down the pipeline
var tmpTearStream;
pipeline.slice(0).reverse().forEach(function(stream) {
if ( !tmpTearStream ) {
tmpTearStream = stream;
continue;
}
tmpTearStream = stream.unpipe(tmpTearStream);
});
// empty and end the pipeline
pipeline.forEach(function(stream) {
if ( typeof stream._writableState === 'object' ) { // empty
stream._writableState.length -= stream._writableState.buffer.length;
stream._writableState.buffer = [];
}
if ( typeof stream.end === 'function' ) { // kill
stream.end();
}
});
我真的很担心stream._writableState
内部buffer
和length
属性的使用和修改(_
表示私有属性)。这似乎是一个黑客。另请注意,由于我是管道,pause
因此resume
我们不可能(基于我从 IRC 收到的建议)。
我还整理了一个可以从 github 获取的可运行版本(相当草率):https ://github.com/zamnuts/multipipe-proto (git clone,npm install,查看自述文件,npm start)