3

我正在处理一个数据流,通常是从RxNode.fromReadableStream()一个中间步骤中提取的,该步骤是异步的,并且可能只能以一定的并发性运行,同时输出步骤只能以串行、离散的批次运行。

基本上是一个经典的转换管道:

cat file | parallel -P5 transform | xargs -n1000 process

有了承诺,它会是这样的(非常简化):

lines.on('readable', function read() {
  let line;
  while (line = lines.read()) {
    queue.push(line);
    if (queue.length >= 10) {
      return Promise.map(queue, item => someAsyncTransformations(item), {concurrency: 5})
        .then(items => someFinalAsyncProcessing(items))
        .then(read);
    }
  }
});

通过在转换后插入控制器,我可以确保转换独立执行,从而实现流水线化;而延迟和合并的结合似乎让我可以控制异步性和并发性。

但是,我controlled()用来确保 map 步骤有效地暂停处理直到完成。它看起来真的很难看——难道没有更好的、非手动的方式来实现无损背压吗?

另外,我相信我必须对转换器使用相同的控制技巧,否则整个流将一次性转换并消耗内存。有没有更惯用的方法?

最后:如果输入的大小不能被批量大小整除,则上述内容实际上不起作用。出于某种原因,如果我request()处理的项目数量超过了剩余项目的数量,整个管道就会退出;onComplete甚至没有被调用。实际上,如果我最初request()用一个很大的数字调用,整个管道似乎无限期地重复相同的内容。错误或预期行为?

编辑我发现我可以使用concatMap. 没关系。

4

0 回答 0