0

我正在使用Android ReactiveLocation 库来接收定期位置更新。即使我的应用程序中没有任何东西在使用它们,我也想继续接收位置更新,这样我总是有一个最新的位置,我可以在需要时立即使用。

这就是我开始在我的应用程序的核心组件中获取位置更新的方式。我想将最新值和任何新检测到的位置重新发布给我的应用程序链中更下游的任何订阅者,这就是replay(1)目的。

locationProvider = new ReactiveLocationProvider(context);
locationObservable = locationProvider.getUpdatedLocation(locationRequest)
.replay(1);

在应用程序的其他地方,我订阅了这个重新发布的 obervable:

locationSubscription = locationObservable
        .filter(new Func1<Location, Boolean>() {
            @Override
            public Boolean call(Location location) {
                return location.getAccuracy() < LOCATION_ACCURACY_THRESHOLD;
            }
        })
        .subscribe(new Action1<Location>() {
            @Override
            public void call(Location location) {
            }
        });

这似乎可以完成工作:我的最终订阅者立即获得最新的位置,并继续接收新的更新,但我想确保当最终订阅者时,我不会在链中某处建立大量未使用位置的缓冲区未订阅。我是 Rx 菜鸟。背压如何应用于这种情况?正在replay(1)做我所期望的,并丢弃除最新位置之外的所有不需要的位置?

4

1 回答 1

1

有几个因素会影响回放。replay(1)执行背压协调并根据订阅者的要求重放值。

这意味着如果订阅者在异步边界之后,replay将开始累积值。发生这种情况的速率取决于当前的订阅者集:运营商请求订阅者请求的最大值,因此混合有界和无界订阅者如果以不同的速率消费可能会导致缓冲区膨胀。

您可以将其缓冲区视为一个单链表,其中每个订阅者都指向该列表中的一个节点,而操作员则指向列表末尾之前的节点。由于单链接,如果没有订阅者,可以将列表的前面进行垃圾收集。

如果有一个取消订阅的大型请求者,replay()将“丢弃”多余的值并继续一次持有 1 个元素,慢慢地“消失”。

另一种方法是使用BehaviorSubject它将记住最后一个值并且根本不做背压,因此没有缓冲区膨胀的风险,但有MissingBackpressureException.

编辑:

实际上,我认为 replay() 中存在背压错误,因此冷源可能无法正常工作。

于 2015-10-16T18:56:25.240 回答