我有一个事件流,我想调用一个函数来为每个事件返回一个承诺,问题是这个函数非常昂贵,所以我想一次最多处理 n 个事件。
这个卵石图可能是错误的,但这是我想要的:
---x--x--xxxxxxx-------------x-------------> //Events
---p--p--pppp------p-p-p-----p-------------> //In Progress
-------d--d--------d-d-dd------dddd--------> //Promise Done
---1--21-2-34-----------3----4-3210-------- //QUEUE SIZE CHANGES
这是我到目前为止的代码:
var n = 4;
var inProgressCount = 0;
var events$ = Rx.Observable.fromEvent(produceEvent, 'click')
.map((ev) => new Date().getTime());
var inProgress$ = events$.controlled();
var done$ = inProgress$
.tap(() => inProgressCount++)
.flatMap((timestamp) => Rx.Observable.fromPromise(expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)));
done$.subscribeOnNext((timestamp) => {
inProgressCount--;
inProgress$.request(Math.max(1, n - inProgressCount));
});
inProgress$.request(n);
这段代码有两个问题:
- 它使用了
inProgressCount
用副作用函数更新的 var。 当我从受控流中请求超过 1 个项目时,仅调用一次 done$ 订阅。这使得inProgressCount
var 更新不正确,这最终将队列限制为一次一个。
你可以在这里看到它工作:http: //jsbin.com/wivehonifi/1/edit ?js,console,output
问题:
- 有更好的方法吗?
- 我怎样才能摆脱
inProgressCount
变量? 为什么在请求多个项目时只调用一次 done$ 订阅?
更新:
对问题 #3 的回答:switchMap 与 flatMapLatest 相同,这就是为什么我只得到最后一个。将代码更新为 flatMap 而不是 switchMap。