18

我正在编写一个模块,它是一个可写流。我想为我的用户实现管道接口。

如果发生一些错误,我需要暂停可读流并发出错误事件。然后,用户将决定 - 如果他可以接受错误,他应该能够恢复数据处理。

var writeable = new BackPressureStream();
writeable.on('error', function(error){
    console.log(error);
    writeable.resume();
});

var readable = require('fs').createReadStream('somefile.txt');
readable.pipe.(writeable);

我看到节点为我们提供了readable.pause()方法,可以用来暂停可读流。但我无法从可写流模块中调用它:

var Writable = require('stream').Writable;

function BackPressureStream(options) {
    Writable.call(this, options);
}
require('util').inherits(BackPressureStream, Writable);

BackPressureStream.prototype._write = function(chunk, encoding, done) {
    done();
};

BackPressureStream.prototype.resume = function() {
    this.emit('drain');
}

如何在可写流中实现背压?

PS 可以使用pipe/unpipe提供可读流作为参数的事件。但也有人说,对于管道流,暂停的唯一机会是将可读流从可写流中分离出来。

我做对了吗?在用户调用恢复之前,我必须解除可写流的管道?在用户调用恢复之后,我应该将可读流返回吗?

4

3 回答 3

2

您所描述的内容已经由pipe方法本身实现。来自文档中的写作部分的错误:

如果流在发出错误时通过Readable管道进入Writable流,则流将被取消管道化。WritableReadable

因此,作为可写流的实现者,您唯一的工作就是实现该_write方法并在发生错误时发出错误。取消管道将由 Stream 模块自动处理。然后,如果模块的使用者认为错误是非关键的,则将可读流返回是他们的工作。以下是他们如何做到这一点:

var writeable = new BackPressureStream();
var readable = require('fs').createReadStream('somefile.txt');

writeable.on('error', function(error) {
    // use pipe again, if error is not critical
    if (!error.critical) {
        readable.pipe(writeable);
    } else {
        readable.destroy(error);
    }
});

readable.pipe(writeable);

在你的模块里面:

BackPressureStream.prototype._write = function(chunk, encoding, done) {
    // call done with an error to emit 'error' event and unpipe readable stream
    done(new Error('BOOM'));
};
于 2018-08-10T12:08:04.113 回答
1

无需访问源流或与源流交互。原生 NodeJS 流现在支持背压和缓冲。并pipe()照顾两者。

您只需要_write()正确实施。

function _write(chunk, enc, callback) {
    // if you don't invoke callback, data is buffered, and writes paused when buffer is full
}

引用文档:

在调用 writable._write() 和调用回调之间发生的所有对 writable.write() 的调用都会导致写入的数据被缓冲。

callback()转发错误后,在用户确认继续之前不要调用下一个块。这将导致数据从源到缓冲区。

当 writable.write(chunk) 方法被重复调用时,数据被缓冲在 Writable 流中。当内部写入缓冲区的总大小低于 highWaterMark 设置的阈值时,对 writable.write() 的调用将返回 true。一旦内部缓冲区的大小达到或超过 highWaterMark,将返回 false。

可写流的缓冲区已满后,调用write()将返回 false。如果源流实现表现良好或者是原生节点流,它会自动停止write()更多数据。

于 2018-04-11T12:55:48.240 回答
0

基本上,据我了解,您希望在发生错误事件的情况下对流施加背压。你有几个选择。

首先,正如您已经确定的那样,使用pipe抓取读取流的实例并做一些花哨的步法。

另一种选择是创建一个提供此功能的包装可写流(即,它将 aWritableStream作为输入,并且在实现流函数时,将数据传递给提供的流。

基本上你最终会得到类似的东西

source stream -> wrapping writable -> writable

https://nodejs.org/api/stream.html#stream_implementing_a_writable_stream处理实现可写流。

对您而言,关键是如果底层可写中发生错误,您将在流上设置一个标志,并且下一次调用write发生时,您将缓冲块,存储回调并仅调用。就像是

// ...
constructor(wrappedWritableStream) {
    wrappedWritableStream.on('error', this.errorHandler);
    this.wrappedWritableStream = wrappedWritableStream;
}
// ...
write(chunk, encoding, callback) {
    if (this.hadError) {
        // Note: until callback is called, this function won't be called again, so we will have maximum one stored
        //  chunk.
        this.bufferedChunk = [chunk, encoding, callback];
    } else {
        wrappedWritableStream.write(chunk, encoding, callback);
    }
}
// ...
errorHandler(err) {
    console.error(err);
    this.hadError = err;
    this.emit(err);
}
// ...
recoverFromError() {
    if (this.bufferedChunk) {
        wrappedWritableStream.write(...this.bufferedChunk);
        this.bufferedChunk = undefined;
    }
    this.hadError = false;
}

注意:您应该只需要实现该write功能,但我鼓励您挖掘并尝试其他实现功能。

还值得注意的是,您在写入已发出错误事件的流时可能会遇到一些麻烦,但我将把它作为一个单独的问题留给您解决。

这是另一个关于背压的好资源 https://nodejs.org/en/docs/guides/backpressuring-in-streams/

于 2017-11-23T08:50:58.367 回答