1

我在流中进行异步调用时遇到问题。看起来由于某种原因,如果我有三个流并且中间流进行异步调用,最终流永远不会收到“结束”事件。我可以通过一些简单的直通流和超时来模拟行为。

var options = {}
var streamOne = function(){
  return through(function write(row){
    this.emit('data', row)       
  })
}

var streamTwo = function(){
  return through(function write(row){
    this.pause()
    var that = this;

    setTimeout(function(){
      that.emit('data', row)
      that.resume()
    }, 1000)
  })
}

options.streams = [new streamOne(), new streamTwo()]

然后我将它传递给event-stream

var streams = []
//Pass stream one and stream two to es.pipe
streams.push(es.pipe.apply(this, options.streams))

//Then add the final stream 
var endStream = es.pipe(
  new streamThree()
)
streams.push(endStream)

//And send it through the pipeline
es.pipeline.apply(this, streams)

因此,这在当前情况下不起作用。

几个令人困惑的点:如果我删除streamOne,它会起作用!如果 streamTwo 不进行异步调用,它可以工作。这让我认为问题在于两个流交互的方式。但是,如果我console.log在整个代码中,看起来一切正常,streamThree 将写入数据但从不注册“结束”事件。*注意:streamThree 不使用 through,而是使用原生 Streams 模块。

思考为什么会这样?

4

1 回答 1

2

运行一些测试用例后,看起来 I/O 没有被管道或直通正确处理。我不完全确定为什么会这样,但我认为这是导致问题的暂停和恢复流的竞争条件。我做了一些事情来清理代码:

1)简化管道。我没有在管道中进行嵌套es.pipe,而是直接将流放入。这有助于更好地管理流之间的数据流。

2)我没有从我的常规直通流中发出数据,而是使用 将数据排队this.queue,让模块处理潜在的背压。

3)我使用事件流方法es.map来处理异步调用的流程。我认为这是一个更好的解决方案,因为它更干净地处理了流的暂停和恢复,并且仍然返回一个直通流。

于 2013-09-11T18:34:15.827 回答