问题标签 [rx-java2]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
706 浏览

rx-java - Vertx HttpClient 和 RxJava 的背压

我在创建“背压系统”时遇到了问题。我正在使用 Vertx HttpClient 和 RxJava。我需要向外部服务发出 6000 个请求,并且为了避免 waitForQueue 中的满,由于这个外部服务不能像我发送的那样快速处理,所以我在请求/响应之间设置了延迟。

由于此旅程是作为批处理过程进行的,因此无需担心需要一分钟。

这是我的代码

此方法从每 24 小时运行一次的 Observable 间隔调用,并通过此子组列表(6000)

但是在检查我的日志后,我看不到我的请求之间有 50 毫秒的延迟

这是我的 3 条日志

知道我需要做什么才能实现这一目标吗?

问候。

解决方案

我最终使用concatMapPlease 如果您有更好的解决方案,请告诉我

0 投票
0 回答
508 浏览

android - 仅重试失败的可观察 RxJava 2

我正在尝试运行一个长时间运行的任务,该任务可能对我尝试重试的列表中的某些对象失败,但它重新订阅了整个可观察对象列表。我可以进行嵌套订阅,但这似乎是错误的。有没有比嵌套订阅更好的解决方案?

这是我的实现:

0 投票
1 回答
557 浏览

android - 经常调用回调方法将事件转发到 Observable?

所以我在自定义滑块视图上有这个监听器。onSliderChanged(int percent)当用户滑动滑块时视图调用。我正在尝试在滑块更改时进行网络调用,但我不想进行一百万次网络调用,因为该方法在滑动时经常被调用。我怎样才能把这个监听器回调变成一个 Observable?我知道一旦它是可观察的,我就可以使用 debounce 并且仅在特定时间间隔后更新。

我试过做Observable.create(),但我在一个回调方法中,我不知道它是如何工作的。我正在使用 Kotlin 和 RxJava2 顺便说一句。

0 投票
1 回答
1218 浏览

rx-java - RxJava:将 observable 拆分为两个列表

用例:我必须更新存储在本地数据库中的当前订单。步骤是:

  1. 从后端下载数据(~800 项)
  2. 检查本地数据库是否已包含项目。后端人员使用字符串作为主键。
  3. 如果该项目不在数据库中,请添加它。
  4. 如果该项目在数据库中,请更新它。

我的第一个解决方案很容易做到。只需调用后端并为每个项目创建两个数据库查询。首先检查它是否已经存在,然后添加或更新它。

正如您可能想象的那样,它很慢。每次更新大约需要 17.5 秒。

第二种解决方案:将数据库数据缓存到一个列表中,而不是每次在ArrayList中搜索时都查询数据库。这导致更新时间下降到 16.5 秒。我曾经从数据库调用defer创建一个并获取结果。ObservablecombineLatest

当前解决方案: 瓶颈当然是数据更新。我使用的 orm 库支持批量更新。所以我必须创建两个列表:DataToUpdateDataToInsert

当前解决方案在大约 5.3 秒内运行。我对时间感到满意,但对这样做的非反应性方式不满意。

问题

你知道如何以被动的方式解决这个问题吗?是否有一个运算符允许将 observable 拆分为两个列表。或者也许我应该使用一个全局变量(似乎是个坏主意)来保存要插入哪些数据和更新哪些数据的信息?

0 投票
1 回答
379 浏览

android - takeUntil 使用 Retrofit、RxJava 和 Gson 优化缓存和网络方法

我正在尝试使用RxJava'takeUntil运算符从服务器获取数据来实现优化的缓存和网络方法。我正在使用 Gson 模型来解析来自 API 的 JSON 响应。

由于使用 Gson 模型,我无法从服务器检索数据。因为返回的类型对于网络请求和磁盘缓存都不匹配。

我一直在测试几种方法,但没有成功。

ApiService.java

交互器.java

交互器.java

缓存数据

Gson 模型解析器

请求响应.java

谢谢 :) 。

0 投票
1 回答
548 浏览

android - Android RxJava 和可观察对象的链接

我正在尝试将几个可观察对象链接在一起,并根据已执行的可观察对象执行一些操作。但我面临着奇怪的行为。

在此示例中,日志将每 2 秒显示一次,但视图的所有更改都将在最后一个 observable 完成时完成。

我认为这是 AndroidScheduler.mainThread() 的行为,因为当我删除这一行并用这样的视图包装更改时

行为变得正确。那么有人可以解释这种行为并提出解决这个问题的正确方法吗?

0 投票
1 回答
8372 浏览

java - “不存在类型变量 R 的实例,因此 Observable 符合 Observable”更新到 RxJava2 时出错

我正在尝试使用retrofit and rxJava. 下面的代码在使用时似乎运行良好RxJava 1,但是一旦我更新到RxJava 2我就会收到此错误:

错误 :

不存在类型变量 R 的实例,因此 Observable 符合 Observable

api

Api 请求在这里完成,这就是我在.map操作员内部得到这个错误的地方

结果类模型:

编辑 :

导入rx.Observable而不是io.reactivex.Observable代码时工作得很好。

谢谢

0 投票
0 回答
476 浏览

android - RxJava2:在给定调度程序上使用 PublishSubject 更改事件?

挑战:我希望有一个存储库来生成查询(Query<T>),调用者可以为其指定调度程序,并以任何它想要的方式进行操作,这反过来也自动更新,因为更改事件通过总线在内部传播(PublishSubject,除非有更好的方法?让我知道)。

调度程序来自调用者,而不是直接从存储库“继承”,这一点很重要;因为某些查询将被合并,因此不能来自不同的线程。基本上,我希望能够为整个 observable 链指定调度程序,并有可能在其中有多个查询 observables。

一般来说 - 这很好用......但是一旦将更改事件引入图片中,事情就会变得非常混乱,非常快。据我了解,这是因为主题将在与调用者相同的线程上传播其事件。这导致Query<T>上述根据调用的位置在不同的调度程序上执行其工作,这subject.onNext(..)就是挑战。

如何通过 PublishSubject 强制 ChangeEvents 在Observable<Query<T>>与其创建的调度程序相同的调度程序上进行观察?没有明确指定 .observeOn()?

我对挑战的不同方法持开放态度,但从我的实验来看,这似乎是“正确”的方法。如果还有其他方法可以解决,请赐教。

我感谢您的智慧和帮助。对冰淇淋的渴望感到抱歉。

0 投票
1 回答
358 浏览

rx-java - RxJava2:第一个订阅和最后一个取消订阅的观察者的 PublishProcessor 回调

我使用 aPublishProcessor<?>来服务多个观察者。有没有办法知道第一个观察者何时被订阅,最后一个观察者何时被处置?

0 投票
2 回答
288 浏览

java - RxJava 访问项下游

我已经将几个rx操作员链接在一起来执行多项任务。我需要从下游的父流中的对象访问字段。

IE。如何访问channel.uid下游?