1

我正在从 NodeJS 中的流中读取数据,然后使用转换流中的异步函数处理该数据。我希望这个转换流能够并行启动对异步函数的多个调用,但它似乎一次只调用一个。

为了说明我的期望,我在下面编写了一个小程序,该程序从0up生成数字limit - 1,然后将其传递给一个转换流,该转换流以一个小的延迟递增每个数字。如果您运行下面的程序,数字 1 到 20 将按顺序记录,所有这些都有一点延迟。

我本来希望它们以 16 + 4 的块记录,因为默认highWaterMark值为 16。是否有可能获得我想要的行为,如果可以,如何?

即读取流将非常快地生成数据,转换速度较慢,但​​应该接收到高水位线,然后等待其数据已被处理,然后从读取流中请求更多。

const stream = require('stream')
const limit = 20
let index = 0

const numberStream = new stream.Readable({
  objectMode: true,
  read (amount) {
    const innerLimit = Math.min(index + amount, limit)
    while (index < innerLimit) {
      this.push(index++)
    }
    if (index === limit) {
      this.push(null)
    }
  },
})

const delayedIncStream = new stream.Transform({
  objectMode: true,
  transform (item, _, cb) {
    setTimeout(() => cb(null, item + 1), 100)
  },
})

const resultStream = numberStream.pipe(delayedIncStream)

resultStream.on('data', console.log)
4

2 回答 2

2

答案是否定的,如文档本节最后一部分所述:https ://nodejs.org/api/stream.html#stream_transform_transform_chunk_encoding_callback

transform._transform() 永远不会被并行调用;流实现了队列机制,为了接收下一个块,必须调用回调,同步或异步。

于 2019-05-21T07:12:04.767 回答
2

您可以使用 nodejs 包parallel-transform-stream来实现这一点,同时保留转换数据的顺序。

然后可以将您的示例重写如下,以并行转换所有数字:

const stream = require('stream')
const ParallelTransform = require('parallel-transform-stream').default
const limit = 20
let index = 0

const numberStream = new stream.Readable({
  objectMode: true,
  read (amount) {
    const innerLimit = Math.min(index + amount, limit)
    while (index < innerLimit) {
      this.push(index++)
    }
    if (index === limit) {
      this.push(null)
    }
  },
})

const delayedIncStream = new (ParallelTransform.create((item, _, cb) => {
  setTimeout(() => cb(null, item + 1), 100)
}))({
  objectMode: true,
  maxParallel: 20
})

const resultStream = numberStream.pipe(delayedIncStream)

resultStream.on('data', console.log)
于 2019-09-06T10:04:39.100 回答