0

快速提问,因为我觉得我一定错过了什么。我在这里使用 rxjs 是因为它是我面前的东西,我相信这是一个一般的 reactiveX 问题。

假设我有一组像这样的 Observable:

network_request = some_thing // An observable that produces the result of a network call
event_stream = network_request.flatMapLatest(function(v) {
    return connectToThing(v) // This is another observable that needs v
}) // This uses the result of the network call to form a long-term event-based connection

所以,这工作正常。不过有问题。有时连接失败。

所以,如果我这样做event_stream.retry(),效果很好。当它失败时,它会重做网络调用并获得一个新v的用于建立新连接。

问题

如果我想把两件东西锁在我的network_request. 也许我希望用户界面在每次网络调用完成时做一些事情,比如v在用户界面中显示一些东西?

我可以:

shared = network_request.share() // Other implementations call this refCount
event_stream = shared.flatMapLatest(...) // same as above
ui_stream = shared.flatMapLatest(...) // Other transformation on network response

如果我不这样做,share那么它会发出两个请求,这不是我想要的,但是shareevent_stream稍后出现错误时,它不会重试网络请求,因为 refcount 仍然为 1(由于ui_stream) ,所以它立即返回完成。

我想要的是

这显然是我为了解释我的困惑而编造的一个小例子。我想要的是,每次event_stream(长期连接)的结果出现错误时,都会发生以下所有情况:

  1. 再次发出网络请求
  2. 该请求的新响应用于建立新连接并event_stream继续新​​事件,就像什么都没发生一样
  3. 也会发出相同的响应ui_stream以进行进一步处理

这感觉不是一件复杂的事情,所以在拆分/扇出 RX 事物时,我一定只是误解了一些基本的东西。

我认为我可以做但想避免的解决方法

我正在寻找导出这些 observables,所以我不能只是重新构建它们然后说“嘿,这是新事物”。我希望event_stream所有下游处理都不知道有断开连接。对ui_stream. 它刚刚获得了新的价值。

我可能可以使用 aSubject作为生成计数器来解决一些问题,每次我希望一切重新启动时都会 ping 通,然后基于此将其network_request放入 aflatMap中,这样我就可以打破share......但这感觉就像一个非常hacky的解决方案,所以我觉得必须有比这更好的方法。

我从根本上误解了什么?

4

2 回答 2

0

您需要通过使用使您的冷可观察到热Publish。阅读http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#HotAndCold以获得很好的解释。

于 2016-05-15T11:51:45.300 回答
0

当我一直在思考这个问题时,我得出了与 ionoy 相同的认识,即retry只是断开和重新连接,而上游不知道这是由于错误造成的。

当我想到我想要什么时,我意识到我真的想要一个像链条这样的东西,也是一个观众,所以我现在有了这个:

network_request = some_thing
network_shadow = new Rx.Subject()

event_stream = network_request.do(network_shadow).flatMapLatest(...)
ui_stream = network_shadow.whatever

这具有重试event_stream或下游将导致整个事情重新启动的属性,而ui_stream它自己的事情。那里的任何错误都不会做任何事情,因为network_shadow它实际上不是 的订阅者event_stream,但只要主事件链正在运行,它就会剥离值。

我觉得这并不理想,但它比我担心我必须做的更好,那就是有一个restartEverything.onNext()in an doOnError,这本来是很恶心的。

我现在要处理这个,我们会看看它在哪里咬我......

于 2016-05-12T14:43:58.493 回答