0
---abcde-----f-------gh-----i---->  //Events

我有一个我想观察/订阅的“工作队列”。这是要处理的命令对象数组。新的工作项目通常会以突发的方式到达,并且需要按顺序进行处理(按收到的顺序,一次一个,直到完全处理)。

我正在使用 RxJS 5.0.0-beta.6。(其他图书馆强加的版本)

这是一个工作示例,说明了我想要的行为,但使用了 RxJS v4。

有问题的主要代码是这个......

var events$ = Rx.Observable.fromEvent(produceEvent, 'click')
  .timestamp()
  .tap(({timestamp}) => updatePanelAppend(pending, timestamp));

var inProgress$ = events$;

var done$ = inProgress$
  .flatMapWithMaxConcurrent(1, ({timestamp}) => 
                            Rx.Observable.fromPromise(() => {
                              updatePanelAppend(inProgress, timestamp);
                              removeFromPanel(pending, timestamp);
                              return expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)
                           }));

done$.subscribeOnNext((timestamp) => {
  updatePanelAppend(done, timestamp);
  removeFromPanel(inProgress, timestamp);
});

http://jsbin.com/meyife/edit?js,输出

鉴于 API 的当前 beta 状态和不完整/更改的文档,我无法弄清楚如何在 RxJS 5 中执行此操作。

更新:这个从 v4 迁移到 v5 的迁移指南显示了许多被删除的功能,但没有指导如何以新的方式做事。已移除操作的示例:.tap、.controlled、.flatMapWithMaxConcurrent(重命名)。

4

1 回答 1

2
  • flatMap/mergeMap - 现在接受一个并发参数

  • tap->do

  • subscribeOnNext不再存在,因此只需subscribe与单个参数一起使用。

  • fromPromiseRxJS 5 上不存在重载,因此请defer改用。

在此处查看更新的 jsbin

于 2016-08-03T16:32:46.710 回答