问题标签 [rx-java3]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
44 浏览

android - 重复请求 X 次或直到返回 Y 个项目

在以下条件下创建 Observable 时遇到问题:

  1. 从 API 获取项目。API 可以返回 0 到 10 个项目。
  2. 如果返回的项目少于 10 个,请从 API 请求更多项目。
  3. 重复 5 次或直到收集到 10 个或更多项目。

到目前为止,我有这个 Observable:

这工作正常,一个挑剔。如果 API 返回 9 个项目,然后返回 10 个项目,则 Observable 返回 10 个项目。剩下的 9 个被丢弃,我不希望这样。有什么办法让它这样工作吗?

0 投票
1 回答
204 浏览

java - rxJava如何使flatMap在多线程上运行

我希望从 flatMap 发出的每个项目都在自己的线程上运行
这是一个真实用法的简化示例,其中每个项目都是一个 url 请求。
在每个单曲上添加 subscribeOn(Schedulers.io()) 仍然在单个线程上运行
这里的规则是什么?

结果:

0 投票
0 回答
60 浏览

android - Single.concatDelayArray 和 .firstOrError() 运算符

我的应用程序中有一个案例,我想从多个来源获取并返回第一个有效数据。我认为 .concatDelayArray() 和 .first() 将是有效的选择,但如果任何非最后一个 Singles 抛出错误,使用它们会导致崩溃。

这是我的测试用例:

这以崩溃结束:

这种情况有更好的运营商吗?

0 投票
1 回答
33 浏览

java - RxJava中值发射代码和值接收代码的线程执行

我有以下代码:

所以这里我们有 2 个重要的 lambda 表达式。第一个是我们传递给 Observable.create 的那个,第二个是我们传递给 Observable.subscribe() 的回调。在第一个 lambda 中,我们创建一个新线程,然后在该线程上发出值。在第二个 lambda 中,我们有代码来接收在第一个 lambda 代码中发出的那些值。我观察到两个代码都在同一个线程上执行。

为什么会这样?默认情况下,RxJava 是否在同一线程上运行代码发射值(可观察)和代码接收值(观察者)?

0 投票
1 回答
76 浏览

rx-java - 使用 Resilience4j RateLimiter 减慢 RxJava3 Flowable

我已经尝试过 Resilience4J 示例,但速率限制器似乎不符合我的要求:

它仍然在不到 200 毫秒的时间内处理 100 个项目。我一定是做错了什么,不知道是什么。有人可以帮忙吗?

0 投票
1 回答
45 浏览

rx-java - rxJava如何在能够访问先前参数的同时进行顺序调用

这是我在服务器中创建文件记录需要遵循的流程

黑色箭头是流
红色箭头是依赖
这是一大功能
在此处输入图像描述

我需要帮助在 rxjava 中设计它,使它们按顺序发生,而后面的单曲能够获得参考

我可以为每次执行任务创建一些单曲

这是我尝试
使用 flatMap 的 resultSelector 的尝试,如此处所述
Retrofit and RxJava: How to combine two requests and get access to both results?

0 投票
2 回答
290 浏览

java - 在 RxJava 中,Emitter 接口和 Observer 接口有什么区别?两者都有相同的方法

在 RxJava 中,Emitter 接口和 Observer 接口有什么区别?两者都有相同的方法

0 投票
3 回答
132 浏览

rxjs - 从不可预测的源 Observable 构建“心跳”Observable

我有一个 Observable ,source它可能会在不可预测的时间发出项目。我正在尝试使用它来构建另一个每 500 毫秒可靠地发出其值的 Observable。

假设source在这些时间发出值:

  • 100 毫秒 - 第一项
  • 980ms - 第二项
  • 1020ms - 第三项
  • 1300ms - 第四个项目,等等。

我想“平滑”这个流,以便得到如下输出:

  • 500 毫秒 - 第一项
  • 1000 毫秒 - 第二项
  • 1500ms - 第三项
  • 2000ms - 第四项

一种天真的方法可能是在源项目的排放之间添加延迟。但是,这不会像我想要的那样创建均匀间隔的间隔。

我尝试了, 和的各种组合.timer(),但没有任何希望。.interval().flatMap()

0 投票
0 回答
345 浏览

android - Android/Java 上的 RxJava3:使用 SingleObserver 的问题(但 Observer 很好!)

问题是我可以使用 anObserver但我不能使用SingleObserver. 它给了我这个错误:

以下是一些实现细节:

故障在于:

如果我替换SingleObserverObserver( 并实现它编译好的所有必需的onXXX()方法。但我想要一个观察者。

我应该降级到 rxjava2(未经测试)吗?

我怀疑我的进口产品很混乱。

似乎有 2 个SingleObserver,另一个在io.reactivex.SingleObserver.

我确实尝试使用这些导入并跳过了rxjava3大多数的前缀。不过AndroidSchedulers只能在io.reactivex.rxjava3.android.schedulers.AndroidSchedulers.

0 投票
0 回答
70 浏览

rx-java - RxJava3 - 如何在使用共享运算符时使用 doFinally 和线程安全操作来避免死锁?

我正在尝试根据一个 ID 将相同的 Flowable 共享给多个订阅者,而每个订阅者都可以在需要时取消订阅以取消订阅。如果某个 id 的所有订阅者都取消订阅,然后另一个订阅者尝试订阅该 id,则应该创建一个新订阅。下面的代码是用 Kotlin 编写的,试图实现这个功能。

当取消订阅,然后订阅相同的 id 时,就会出现问题。由于 share 返回的 FlowableRefCount 的工作方式,我发现这种方法可能存在死锁。在底层实现中,FlowableRefCount 在其 subscribeActual 和 cancel 方法中使用了同步机制,虽然乍一看并不明显,但 doFinally 方法的动作是在该锁定机制内运行的。这有点违反直觉,因为在 doFinally 文档中它说:

请注意,onFinally 操作在订阅之间共享,因此应该是线程安全的。

正因为如此,出现了以下场景:

  • 通过取消锁定 FlowableRefCount 实例
  • subscribeToProperties 调用的 id 与取消中的相同
  • subscribeToProperties 在调用 subscribe(然后是 subscribeActual)时被阻塞,因为已经通过取消在 FlowableRefCount 上获得了锁
  • 调用 removeOrderSubscription 被阻塞,因为 SharedFlowProvider 上的锁定已经通过 subscribeToProperties 方法获得

我还尝试通过以下方式使用 ConcurrentHashMap 将锁定从 subscribeToProperties 方法中分离出来:

但是在这种方法中,如果我们在方法的最后一行之前有一个订阅然后快速取消,我们最终可能会将流放入地图中,即使我们没有订阅者。

我将不胜感激有关如何在没有死锁或竞争条件的情况下实现此功能的一些想法。

谢谢