---abcde-----f-------gh-----i----> //Events
我有一个我想观察/订阅的“工作队列”。这是要处理的命令对象数组。新的工作项目通常会以突发的方式到达,并且需要按顺序进行处理(按收到的顺序,一次一个,直到完全处理)。
我正在使用 RxJS 5.0.0-beta.6。(其他图书馆强加的版本)
这是一个工作示例,说明了我想要的行为,但使用了 RxJS v4。
有问题的主要代码是这个......
var events$ = Rx.Observable.fromEvent(produceEvent, 'click')
.timestamp()
.tap(({timestamp}) => updatePanelAppend(pending, timestamp));
var inProgress$ = events$;
var done$ = inProgress$
.flatMapWithMaxConcurrent(1, ({timestamp}) =>
Rx.Observable.fromPromise(() => {
updatePanelAppend(inProgress, timestamp);
removeFromPanel(pending, timestamp);
return expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)
}));
done$.subscribeOnNext((timestamp) => {
updatePanelAppend(done, timestamp);
removeFromPanel(inProgress, timestamp);
});
http://jsbin.com/meyife/edit?js,输出
鉴于 API 的当前 beta 状态和不完整/更改的文档,我无法弄清楚如何在 RxJS 5 中执行此操作。
更新:这个从 v4 迁移到 v5 的迁移指南显示了许多被删除的功能,但没有指导如何以新的方式做事。已移除操作的示例:.tap、.controlled、.flatMapWithMaxConcurrent(重命名)。