快速提问,因为我觉得我一定错过了什么。我在这里使用 rxjs 是因为它是我面前的东西,我相信这是一个一般的 reactiveX 问题。
假设我有一组像这样的 Observable:
network_request = some_thing // An observable that produces the result of a network call
event_stream = network_request.flatMapLatest(function(v) {
return connectToThing(v) // This is another observable that needs v
}) // This uses the result of the network call to form a long-term event-based connection
所以,这工作正常。不过有问题。有时连接失败。
所以,如果我这样做event_stream.retry()
,效果很好。当它失败时,它会重做网络调用并获得一个新v
的用于建立新连接。
问题
如果我想把两件东西锁在我的network_request
. 也许我希望用户界面在每次网络调用完成时做一些事情,比如v
在用户界面中显示一些东西?
我可以:
shared = network_request.share() // Other implementations call this refCount
event_stream = shared.flatMapLatest(...) // same as above
ui_stream = shared.flatMapLatest(...) // Other transformation on network response
如果我不这样做,share
那么它会发出两个请求,这不是我想要的,但是share
当event_stream
稍后出现错误时,它不会重试网络请求,因为 refcount 仍然为 1(由于ui_stream
) ,所以它立即返回完成。
我想要的是
这显然是我为了解释我的困惑而编造的一个小例子。我想要的是,每次event_stream
(长期连接)的结果出现错误时,都会发生以下所有情况:
- 再次发出网络请求
- 该请求的新响应用于建立新连接并
event_stream
继续新事件,就像什么都没发生一样 - 也会发出相同的响应
ui_stream
以进行进一步处理
这感觉不是一件复杂的事情,所以在拆分/扇出 RX 事物时,我一定只是误解了一些基本的东西。
我认为我可以做但想避免的解决方法
我正在寻找导出这些 observables,所以我不能只是重新构建它们然后说“嘿,这是新事物”。我希望event_stream
所有下游处理都不知道有断开连接。对ui_stream
. 它刚刚获得了新的价值。
我可能可以使用 aSubject
作为生成计数器来解决一些问题,每次我希望一切重新启动时都会 ping 通,然后基于此将其network_request
放入 aflatMap
中,这样我就可以打破share
......但这感觉就像一个非常hacky的解决方案,所以我觉得必须有比这更好的方法。
我从根本上误解了什么?