我在流中进行异步调用时遇到问题。看起来由于某种原因,如果我有三个流并且中间流进行异步调用,最终流永远不会收到“结束”事件。我可以通过一些简单的直通流和超时来模拟行为。
var options = {}
var streamOne = function(){
return through(function write(row){
this.emit('data', row)
})
}
var streamTwo = function(){
return through(function write(row){
this.pause()
var that = this;
setTimeout(function(){
that.emit('data', row)
that.resume()
}, 1000)
})
}
options.streams = [new streamOne(), new streamTwo()]
然后我将它传递给event-stream。
var streams = []
//Pass stream one and stream two to es.pipe
streams.push(es.pipe.apply(this, options.streams))
//Then add the final stream
var endStream = es.pipe(
new streamThree()
)
streams.push(endStream)
//And send it through the pipeline
es.pipeline.apply(this, streams)
因此,这在当前情况下不起作用。
几个令人困惑的点:如果我删除streamOne,它会起作用!如果 streamTwo 不进行异步调用,它可以工作。这让我认为问题在于两个流交互的方式。但是,如果我console.log
在整个代码中,看起来一切正常,streamThree 将写入数据但从不注册“结束”事件。*注意:streamThree 不使用 through,而是使用原生 Streams 模块。
思考为什么会这样?