问题标签 [rx-java]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
911 浏览

rx-java - 几个 observables 链在一起完成

我需要使用第一个 observable 结果来创建第二个 observable 并订阅它的结果,有时它需要 3 到 4 层 observables。完成此任务的最佳方法是什么?

0 投票
1 回答
734 浏览

android - RxJava Android - 在适当的线程上加载缓存显示数据

我正在探索 RxJava 及其对 Android 的适用性,并且我正在尝试实现一个简单的 load-cache-display 用例,如下面的 ASCII 图表所示:

这是我最初想出的代码:

正如预期的那样,网络和缓存是在后台线程上执行的,而数据的显示则是在 UI 线程上进行的。问题是返回的数据onErrorReturn()经过相同的过滤和缓存循环,这是多余的。但是,如果我将代码更改为:

displayData()永远不会被调用。组成这些可观察对象以实施我所拥有的方案的正确方法是什么?

0 投票
1 回答
67 浏览

scala - 注册多个完成例程

我有一个ObservableStream[Item]Item这里只是举例)构造的这个 observable 通过一组转换:

我想在完成每个转换(第一次和第二次)时执行一些操作。如果我调用doOncompleted方法observable1并且observable2我没有得到想要的效果(实际上我没有观察到任何副作用)。

我怎样才能做到这一点?这可能吗?

0 投票
5 回答
30335 浏览

android - Rx Observable 定期发射值

我必须定期轮询一些 RESTful 端点以刷新我的 android 应用程序的数据。我还必须根据连接暂停和恢复它(如果手机离线,甚至不需要尝试)。我当前的解决方案正在运行,但它使用标准 JavaScheduledExecutorService来执行周期性任务,但我想留在 Rx 范式中。

这是我当前的代码,为简洁起见,跳过了部分代码。

networkStatusObservable基本上是一个广播接收器包裹在Observable<Boolean>中,表示手机已连接到网络。

正如我所说,这个解决方案是有效的,但我想使用 Rx 方法进行定期轮询和发出 new UserProfiles,因为手动安排事情有很多问题,我想避免这些问题。我知道Observable.timerand Observable.interval,但不知道如何将它们应用于此任务(而且我不确定是否需要使用它们)。

0 投票
7 回答
31831 浏览

java - RxJava 中的 concatMap 和 flatMap 有什么区别

看起来这两个功能非常相似。它们具有相同的签名(接受rx.functions.Func1<? super T, ? extends Observable<? extends R>> func),并且它们的大理石图看起来完全相同。无法在此处粘贴图片,但这是concatMap的一张,这是flatMap的一张。对 results的描述似乎存在一些细微的差别Observable,其中一个由concatMapcontains 产生的项是由连接结果 Observables 产生的,而一个由flatMapcontains 产生的项是由首先合并产生的 Observables 并发出该合并的结果而产生的。

但是,我完全不清楚这种微妙之处。谁能更好地解释这种差异,最好举一些例子来说明这种差异。

0 投票
1 回答
117 浏览

system.reactive - 用于忽略相等项目的 RxJava 实用程序?

我试图在 RxJava 中找到 Observable 运算符,它将执行以下过滤器:

我可以构建一些东西,但我认为标准库中一定有我忽略的东西。

distinct不起作用,因为它只返回0 5 1.

0 投票
1 回答
563 浏览

reactive-programming - RxJava:阻塞直到重试

我正在尝试学习反应式编程,作为第一个“真正的”应用程序,我选择了一个简单的 IRC 客户端,带有RxJavaRxNetty

如果第一个服务器失败,我目前正在重试另一台服务器。所以,我有一个Observable<Server>,它由网络的服务器组成。它可以使用 设置.repeat(),因此它无限期地重复服务器。RxClient::connect现在,我如何使它成为一个阻塞的,以便一次只使用一个,并且只有在失败或连接超时时才会转到下一个?

虽然其他解决方案是使用域,每次我连接并使用时都会将我重定向到不同的服务器.retry(),但我有兴趣以反应方式解决问题。

连接到所有服务器(不带.repeat())工作正常,但这不是我想要的:

0 投票
2 回答
233 浏览

reactive-programming - Concat 运算符语义,但立即订阅所有不依赖的 observables

我想连接一个冷和一个热的可观察对象。也就是说,结果 observable 应该首先发出冷 observable 的结果,然后是 hot observable 的结果。同时,我希望订阅第二个 observable,即 hot,在订阅第一个 observable 的同时发生,否则我会错过其中的一个重要事件。

这看起来与会做的事情非常相似merge。但我想保证热的 observable 在冷的 observable 完成之前不会推送任何东西,这merge并不能保证。解决这个问题的正确方法是什么?

0 投票
1 回答
3450 浏览

java - RXJava 结合多个订阅

所以我有一个我似乎根本无法解决的情况。

我有一种情况,我想并行运行两个网络请求,然后在每个网络请求结束时运行一些代码,然后在每个网络请求处理结束时额外运行。

建模成这样

我一直在尝试做一个 Observable.merge 但这似乎很有希望,因为它不允许我将订阅代码与一个大型处理程序分开。有人有建议吗?

0 投票
1 回答
1332 浏览

java - 在 RxJava 中重试异步操作链的内置或推荐方式

我有一个在 RxJava 中建模的相互依赖的异步操作图。对于某些错误,应重新运行整个图形。retry(..) 运营商不直接支持这一点,因为任何错误都会呈现给所有订阅者。由于retry(..)操作员只是重新订阅,他们总是从最终的 observable 中得到错误,只计算一次。即重新订阅时不会再次执行该工作。

我尝试创建一个特殊的 observable,它为每个订阅调用一个 observable 生成方法。在这种情况下,重试操作符主要按预期工作,并且在一些额外的缓存操作之后,完全按预期工作。

然而,这似乎很常见,以至于我怀疑我在重复 RxJava 中已经提供的工作。我还担心我的解决方案的健壮性,因为我正在尝试在低级别做一些事情,可能没有足够的 RxJava 知识来这样做。另一个问题是可组合性:要支持所有三种retry(..)形式,我需要三个版本的包装方法。

下面的演示解释了我正在尝试做的事情以及到目前为止的成功。

在 RxJava 中是否有更简单或更惯用(或两者兼有)的方法来进行这种重试?