我正在尝试使用 RxJS 编写一个脚本来处理数百个日志文件,每个日志文件大约 1GB。脚本的骨架看起来像
Rx.Observable.from(arrayOfLogFilePath)
.flatMap(function(logFilePath){
return Rx.Node.fromReadStream(logFilePath)
.filter(filterLogLine)
})
.groupBy(someGroupingFunc)
.map(someFurtherProcessing)
.subscribe(...)
该代码有效,但请注意所有日志文件的过滤步骤将同时开始。但是,从文件系统 IO 性能的角度来看,最好是一个接一个地处理一个文件(或者至少将并发限制为几个文件,而不是同时打开所有数百个文件)。在这方面,我如何以“功能反应方式”实现它?
我曾想过调度程序,但无法弄清楚它如何在这里提供帮助。