6

我想用 RxJs 实现 Time Expiry 缓存。这是“正常”缓存的示例:

//let this represents "heavy duty job"
var data = Rx.Observable.return(Math.random() * 1000).delay(2000);

//and we want to cache result
var cachedData = new Rx.AsyncSubject();
data.subscribe(cachedData);

cachedData.subscribe(function(data){
    //after 2 seconds, result is here and data is cached
    //next subscribe returns immediately data
    cachedData.subscribe(function(data2){ /*this is "instant"*/ });
});

subscribe第一次调用on时cachedData,调用“heavy duty job”,2 秒后结果保存在cachedData( AsyncSubject) 中。任何其他后续subscribeoncachedData立即返回并保存结果(因此缓存实现)。

我想要实现的是在cachedData有效的时间段内“调味”,当那个时间过去时,我想为新数据重新运行“繁重的工作”并再次缓存这个新的时间期等...

期望的行为:

//pseudo code
cachedData.youShouldExpireInXSeconds(10);


//let's assume that all code is sequential from here

//this is 1.st run
cachedData.subscribe(function (data) {
    //this first subscription actually runs "heavy duty job", and
    //after 2 seconds first result data is here
});

//this is 2.nd run, just after 1.st run finished
cachedData.subscribe(function (data) {
    //this result is cached
});

//15 seconds later
// cacheData should expired
cachedData.subscribe(function (data) {
    //i'm expecting same behaviour as it was 1.st run:
    // - this runs new "heavy duty job"
    // - and after 2 seconds we got new data result
});


//....
//etc

我是 Rx(Js) 的新手,无法弄清楚如何通过冷却来实现这个热可观察对象。

4

2 回答 2

6

您所缺少的只是安排一项任务,在一段时间后cachedData用新任务替换您的任务。AsyncSubject以下是如何将其作为一种新Rx.Observable方法:

Rx.Observable.prototype.cacheWithExpiration = function(expirationMs, scheduler) {
    var source = this,
        cachedData = undefined;

    // Use timeout scheduler if scheduler not supplied
    scheduler = scheduler || Rx.Scheduler.timeout;

    return Rx.Observable.create(function (observer) {

        if (!cachedData) {
            // The data is not cached.
            // create a subject to hold the result
            cachedData = new Rx.AsyncSubject();

            // subscribe to the query
            source.subscribe(cachedData);

            // when the query completes, start a timer which will expire the cache
            cachedData.subscribe(function () {
                scheduler.scheduleWithRelative(expirationMs, function () {
                    // clear the cache
                    cachedData = undefined;
                });
            });
        }

        // subscribe the observer to the cached data
        return cachedData.subscribe(observer);
    });
};

用法:

// a *cold* observable the issues a slow query each time it is subscribed
var data = Rx.Observable.return(42).delay(5000);

// the cached query
var cachedData = data.cacheWithExpiration(15000);

// first observer must wait
cachedData.subscribe();

// wait 3 seconds

// second observer gets result instantly
cachedData.subscribe();

// wait 15 seconds

// observer must wait again
cachedData.subscribe();
于 2014-10-21T13:54:34.987 回答
1

一个简单的解决方案是创建一个自定义的可管道操作符来repeatWhen 持续时间已过。这是我想出的:

 export const refreshAfter = (duration: number) => (source: Observable<any>) =>
                                  source.pipe(
                                     repeatWhen(obs => obs.pipe(delay(duration))),
                                     publishReplay(1), 
                                     refCount());

然后我像这样使用它:

const serverTime$ = this.environmentClient.getServer().pipe(map(s => s.localTime))
const cachedServerTime$ = serverTime.pipe(refreshAfter(5000)); // 5s cache

Important note: This uses publishReplay(1), refCount() because shareReplay(1) doesn't unsubscribe from the source observable so it'll keep hitting your server forever. Unfortunately this has the consequence that on error that error will be replayed from the publishReplay(1), refCount(). There's a 'new improved' shareReplay coming soon. See notes here on a similar question. Once this 'new' version is available this answer should be updated - but the beauty of custom operators is you can fix them in one place.

于 2019-01-10T03:12:46.233 回答