7

我正在尝试使用 RxJS 编写一个脚本来处理数百个日志文件,每个日志文件大约 1GB。脚本的骨架看起来像

Rx.Observable.from(arrayOfLogFilePath)
.flatMap(function(logFilePath){
   return Rx.Node.fromReadStream(logFilePath)
   .filter(filterLogLine)
})
.groupBy(someGroupingFunc)
.map(someFurtherProcessing)
.subscribe(...)

该代码有效,但请注意所有日志文件的过滤步骤将同时开始。但是,从文件系统 IO 性能的角度来看,最好是一个接一个地处理一个文件(或者至少将并发限制为几个文件,而不是同时打开所有数百个文件)。在这方面,我如何以“功能反应方式”实现它?

我曾想过调度程序,但无法弄清楚它如何在这里提供帮助。

4

2 回答 2

14

您可以使用.merge(maxConcurrent)来限制并发。因为.merge(maxConcurrent)将 metaobservable(可观察的 observable)扁平化为 observable,所以您需要将 替换为.flatMap.map以便输出是 metaobservable(“unflat”),然后调用.merge(maxConcurrent).

Rx.Observable.from(arrayOfLogFilePath)
.map(function(logFilePath){
   return Rx.Node.fromReadStream(logFilePath)
   .filter(filterLogLine)
})
.merge(2) // 2 concurrent 
.groupBy(someGroupingFunc)
.map(someFurtherProcessing)
.subscribe(...)

此代码尚未经过测试(因为我无权访问您拥有的开发环境),但这是继续进行的方法。RxJS 没有很多带有并发参数的操作符,但是你几乎总是可以用.merge(maxConcurrent).

于 2014-09-30T17:34:06.550 回答
1

我刚刚用 RxJs 5 解决了一个类似的问题,所以我希望这个解决方案可以帮助其他有类似问题的人。

// Simulate always processing 2 requests in parallel (when one is finished it starts processing one more),
// retry two times, push error on stream if retry fails.

//const Rx = require('rxjs-es6/Rx');

// -- Global variabel just to show that it works. --
let parallelRequests = 0;
// --------------------------------------------------

function simulateRequest(req) {
    console.log("Request " + req);
    // --- To log retries ---
    var retry = 0;
    // ----------------------

    // Can't retry a promise, need to restart before the promise is made.
    return Rx.Observable.of(req).flatMap(req => new Promise((resolve, reject) => {

        var random = Math.floor(Math.random() * 2000);
        // -- To show that it works --
        if (retry) {
            console.log("Retrying request " + req + " ,retry " + retry);
        } else {

            parallelRequests++;
        }
        // ---------------------------
        setTimeout(() => {
            if (random < 900) {
                retry++;
                return reject(req + " !!!FAILED!!!");
            }

            return resolve(req);
        }, random);
    })).retry(2).catch(e => Rx.Observable.of(e));
}

Rx.Observable.range(1, 10)
    .flatMap(e => simulateRequest(e), null, 2)
    // -- To show that it works --
    .do(() => {
        console.log("ParallelRequests " + parallelRequests);
        parallelRequests--;
    })
    // ---------------------------
    .subscribe(e => console.log("Response from request " + e), e => console.log("Should not happen, error: " + e), e => console.log("Finished"));
<script src="https://npmcdn.com/@reactivex/rxjs@5.0.0-beta.6/dist/global/Rx.umd.js"></script>

于 2016-08-22T14:19:48.240 回答