我正在处理一个数据流,通常是从RxNode.fromReadableStream()
一个中间步骤中提取的,该步骤是异步的,并且可能只能以一定的并发性运行,同时输出步骤只能以串行、离散的批次运行。
基本上是一个经典的转换管道:
cat file | parallel -P5 transform | xargs -n1000 process
有了承诺,它会是这样的(非常简化):
lines.on('readable', function read() {
let line;
while (line = lines.read()) {
queue.push(line);
if (queue.length >= 10) {
return Promise.map(queue, item => someAsyncTransformations(item), {concurrency: 5})
.then(items => someFinalAsyncProcessing(items))
.then(read);
}
}
});
通过在转换后插入控制器,我可以确保转换独立执行,从而实现流水线化;而延迟和合并的结合似乎让我可以控制异步性和并发性。
但是,我controlled()
用来确保 map 步骤有效地暂停处理直到完成。它看起来真的很难看——难道没有更好的、非手动的方式来实现无损背压吗?
另外,我相信我必须对转换器使用相同的控制技巧,否则整个流将一次性转换并消耗内存。有没有更惯用的方法?
最后:如果输入的大小不能被批量大小整除,则上述内容实际上不起作用。出于某种原因,如果我request()
处理的项目数量超过了剩余项目的数量,整个管道就会退出;onComplete
甚至没有被调用。实际上,如果我最初request()
用一个很大的数字调用,整个管道似乎无限期地重复相同的内容。错误或预期行为?
编辑:我发现我可以使用. 没关系。concatMap