我有消息的来源,它是一个Observable
. 对于每条消息,我想进行一个 HTTP 调用,该调用将产生另一个Observable
,所以我将它们与 结合在一起,flatMap
然后将它们发送给某个订阅者。这里是这个场景的代码:
Rx.Observable.interval(1000)
.flatMap (tick) ->
// returns an `Observable`
loadMessages()
.flatMap (message) ->
// also returns and `Observable`
makeHttpRequest(message)
.subscribe (result) ->
console.info "Processed: ", result
这个例子是用咖啡脚本编写的,但我认为问题陈述对于任何其他 Rx 实现都是有效的。
我对这种方法的问题是loadMessages
会很快产生大量消息。这意味着,我在很短的时间内发出了很多 HTTP 请求。这在我的情况下是不可接受的,所以我想将并行 HTTP 请求的数量限制在 10 个左右。换句话说,当我发出 HTTP 请求时,我想限制管道或应用某种背压。
Rx 是否有任何标准方法或最佳实践来处理这种情况?
目前,我实现了非常简单(并且非常次优)的背压机制,如果系统处理的按摩过多,则会忽略滴答声。它看起来像这样(简化版):
Rx.Observable.interval(1000)
.filter (tick) ->
stats.applyBackpressureBasedOnTheMessagesInProcessing()
.do (tick) ->
stats.messageIn()
.flatMap (tick) ->
// returns an `Observable`
loadMessages()
.flatMap (message) ->
// also returns and `Observable`
makeHttpRequest(message)
.do (tick) ->
stats.messageOut()
.subscribe (result) ->
console.info "Processed: ", result
不过,我不确定这是否可以做得更好,或者 Rx 可能已经有一些机制来处理这种需求。