首先,将您的异步操作移出subscribe
,它不是为异步操作而设计的。
您可以使用的是mergeMap
(alias flatMap
) 或 concatMap
. (我提到了他们两个,但实际上concatMap
参数设置为 1。)设置不同的并发参数很有用,因为有时您希望限制并发查询的数量,但仍然运行几个并发。mergeMap
concurrent
source.concatMap(item => {
if (item == 'do-something-async-and-wait-for-completion') {
return Rx.Observable.timer(5000)
.mapTo(item)
.do(e => console.log('okay, we can continue'));
} else {
// do something synchronously and keep on going immediately
return Rx.Observable.of(item)
.do(e => console.log('ready to go!!!'));
}
}).subscribe();
我还将展示如何对通话进行速率限制。忠告:仅在您实际需要时限制速率,例如在调用外部 API 时,该 API 每秒或分钟只允许一定数量的请求。否则最好只限制并发操作的数量并让系统以最大速度移动。
我们从以下片段开始:
const concurrent;
const delay;
source.mergeMap(item =>
selector(item, delay)
, concurrent)
concurrent
接下来,我们需要为 选择值delay
并实现selector
。concurrent
并且delay
密切相关。例如,如果我们想每秒运行 10 个项目,我们可以使用concurrent = 10
and delay = 1000
(毫秒),也可以使用concurrent = 5
anddelay = 500
或concurrent = 4
and delay = 400
。每秒的项目数将始终为concurrent / (delay / 1000)
。
现在让我们实现selector
. 我们有几个选择。我们可以为 设置一个最小的执行时间selector
,我们可以给它添加一个恒定的延迟,我们可以在结果可用时立即发出结果,我们可以在最小延迟过去后才发出结果等等。甚至可以使用timeout
运算符添加超时。方便。
设置最短时间,尽早发送结果:
function selector(item, delay) {
return Rx.Observable.of(item)
.delay(1000) // replace this with your actual call.
.merge(Rx.Observable.timer(delay).ignoreElements())
}
设置最短时间,延迟发送结果:
function selector(item, delay) {
return Rx.Observable.of(item)
.delay(1000) // replace this with your actual call.
.zip(Rx.Observable.timer(delay), (item, _))
}
添加时间,提前发送结果:
function selector(item, delay) {
return Rx.Observable.of(item)
.delay(1000) // replace this with your actual call.
.concat(Rx.Observable.timer(delay).ignoreElements())
}
添加时间,延迟发送结果:
function selector(item, delay) {
return Rx.Observable.of(item)
.delay(1000) // replace this with your actual call.
.delay(delay)
}