问题标签 [rx-java3]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
android - 在 rxjava 中订阅消费者?
我在我的代码中替换了对 Consumer 的操作调用,但是在订阅它时一直要求我将其投射到观察者
下面是代码
我得到错误.subscribe(subscriptionPlans);
将其转换为.subscribe((Observer<? super List<ContentDatum>>) subscriptionPlans);
正确的方法应该是什么?
在运行代码时,我得到了异常
java - RxJava PublishSubject 订阅不提供任何东西。订阅有什么问题吗?
学习 RxJava 将不胜感激任何建议。
计算准备好后需要接收对象,所以我PublishSubject
在models方法中做了:
然后尝试订阅演示者:
这给了我什么。但是如果我使用ReplaySubject
- 它会给我所有的结果,但似乎它只使用一个线程。所以我认为我在订阅方面出了点问题,它应该在更早的地方。我需要准确地使用PublishSubject
给我的结果,因为它们已经准备好使用多个线程。
如何解决?或者也许还有其他问题?
rx-java - RxJava:如何取消订阅 observable 的主题
使用 RxJava3,给定 anObservable
和 a Subject
,我可以将 Subject 订阅到 Observable:
后来我的 Subject 对 Observable 不再感兴趣了,如何退订 Observable 呢?
java - RxJava 阀门用例
RxJava 中是否有运算符、外部库或我缺少创建可流动/可观察的方法来接收控制数据发射的函数,例如阀门?
我有一个巨大的 json 文件需要处理,但我必须获取文件的一部分、实体列表、处理它然后获取另一部分,我尝试使用 windows()、buffer() 但我通过了 BiFunction to Flowable.generate() 在我收到第一个列表并且我还没有完成处理后继续执行。我还尝试了 hu.akarnokd.rxjava3.operators 中的 FlowableTransformers.valve() 但它只是在处理列表的 flatMap() 函数之前堆积了项目
编辑:我需要控制项目的排放,以免内存和处理数据的函数过载,因为该函数读取和写入数据库,处理也需要按顺序进行。处理数据的函数并不完全是我的,它是用 RxJava 编写的,预计我会使用 Rx。
我设法像这样解决它,但如果有其他方法请告诉我:
Edit2:这是我目前使用该功能的方式之一
java - 当使用 groupBy 运算符时,Flowable 会忽略 Subscription.request
我是 RxJava 的新手,如果有人能澄清这一点,那就太好了……
考虑以下流程:
- 每次订阅者请求一个值时生成/发出一个整数。
- 对生成的整数进行分组,以便每个值构成自己的组。注意:目标是模拟输入中存在大量组的情况。
- 现在对于每个组:
- 将值收集到一系列 1 秒长的块中,并在第一个空块到达时停止。注意:在现实世界中,每组会有多个值,但这里每组恰好有 2 个缓冲区:1 个带有单个 int 的缓冲区和 1 个空缓冲区,这将有效地完成序列。
- 将所有非空块(在本例中为 1)合并到一个列表中。
- 将结果列表打印到标准输出中并再请求 1 个元素。
上述流程的原始实现:
如您所见,我只创建了 1 个自定义订阅者,它在创建订阅时请求 1 个项目,并且在每次处理列表时再请求 1 个。虽然flatMap
'smaxConcurrency
设置为Integer.MAX_VALUE
(因为我想处理尽可能多的组) - 所有其他“提示”(例如 groupBy 和 flatMap's bufferSize
)都设置为 1 并且没有其他缓冲(例如onBackpressureBuffer
,“缓冲有限数量的来自当前的项目Flowable
并允许它尽可能快地发射”)被请求。
所以问题是onNext
's 的调用次数(因此,它请求一个值)远低于我传递给的 lambda 调用次数Flowable.generate
- 大多数时候超过 40,000 次,并且我希望它至少与通过Subscrtiption.request
. 正如我可以通过 lambda 的调用堆栈判断的那样,每次创建一个组时,它都会调用io.reactivex.rxjava3.internal.operators.flowable.FlowableGroupBy.GroupBySubscriber#onSubscribe
,然后调用org.reactivestreams.Subscription#request
传递bufferSize
(在上面的代码片段中配置为 1 )给它,所以这个过程几乎是自我延续的,所以说话。我不能指定bufferSize
为 0,我也不能限制同时处理的组数:有限maxConcurrency
值很快导致MissingBackpressureException
...
我是否在这里遗漏了一些基本的东西,或者没有办法在这样的用例中真正应用背压?
提前致谢。
java - RxJava:如何使用 Observable 代替嵌套循环?
现在我使用嵌套循环ReplaySubject
来获取多个可观察对象,这需要在不使用任何循环的情况下进行转换,只使用可观察对象。
" Map<Integer, Integer> elem : list
" 有 3 个元素。" MapOperationName.values()
" 返回 7 个元素。所以我在上面的例子中完全应该是 21 岁。
试图重构这样的东西,但只得到 3 个元素:
如何正确组合ArrayList
并Enum
获得全套配对组合?
java - RxJava:如何使代码异步工作?
如果我对异步工作的结论有误,请向我提出建议,并就如何使这项工作异步工作提出任何建议。
此代码执行网格单元中计算的设置结果。我假设异步工作应该按部分显示网格单元。因此,如果我们有 8 个内核作为结果,我们可以看到显示了 8 个单元格,并且在一段时间内显示了另外 8 个,依此类推(如果我设置了一些时间延迟)。但是现在作为结果单元格显示成为一个接一个。
模型:
主持人:
UPD:现在我制作了测试示例并尝试正确订阅主题。如果我使用 subject.onNext() 它异步工作,但我认为这是错误的,因为它无法检查 subject.hasComplete() 并获得“真”。请参阅下面的评论“TODO”。
摇篮:
android - retrofit2 和 rxjava3:java.lang.IllegalArgumentException:找不到 io.reactivex.rxjava3.core.Observable 的调用适配器
我想使用 retrofit2 和 rxjava3 但我看到以下错误
此错误表示Rxjava 没有适配器,但我在下面的行中添加了它
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
和我的 ApiInterface
并用于活动
和 build.gradle
rx-java - RxJava Flowable 中的延迟元素
delayElements
@ProjectReactor 发布者 Flux 中有一个非常直观的运算符,用于在每个发射的元素之间引入延迟。例如,下面的 cod 每秒发出一个元素。
为了在@Rxjava 中完成相同的行为,我必须做一些仪式。
有谁知道在@Rxjava 中执行上述操作的更好方法?
android - RxJava Junit4 多重可观察测试
第一次测试Rx,所以请给一些建议。
需要在模型中测试可观察链,结果给出 21 个数据对象:
这是我尝试过的一些方法。
首先尝试(这里我在获取结果时遇到问题并单独检查它们,因为每个对象内部的计算结果未知):
第二种方式:
像我用时间延迟和循环指出第二种方式那样进行测试是否正确?
如何正确检查结果它给出了 21 个对象并检查数据类的某些属性大于零的条件?还有其他需要执行的检查吗?
还有一个普遍的问题:需要测试使用此模型方法的 mvp Presenter 并因此显示收到的值。如何仅使用 JUnit4 来做到这一点?