问题标签 [rx-java3]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
349 浏览

android - 在 rxjava 中订阅消费者?

我在我的代码中替换了对 Consumer 的操作调用,但是在订阅它时一直要求我将其投射到观察者

下面是代码

我得到错误.subscribe(subscriptionPlans);将其转换为.subscribe((Observer<? super List<ContentDatum>>) subscriptionPlans);

正确的方法应该是什么?

在运行代码时,我得到了异常

0 投票
0 回答
22 浏览

java - RxJava PublishSubject 订阅不提供任何东西。订阅有什么问题吗?

学习 RxJava 将不胜感激任何建议。

计算准备好后需要接收对象,所以我PublishSubject在models方法中做了:

然后尝试订阅演示者:

这给了我什么。但是如果我使用ReplaySubject- 它会给我所有的结果,但似乎它只使用一个线程。所以我认为我在订阅方面出了点问题,它应该在更早的地方。我需要准确地使用PublishSubject给我的结果,因为它们已经准备好使用多个线程。

如何解决?或者也许还有其他问题?

0 投票
1 回答
275 浏览

rx-java - RxJava:如何取消订阅 observable 的主题

使用 RxJava3,给定 anObservable和 a Subject,我可以将 Subject 订阅到 Observable:

后来我的 Subject 对 Observable 不再感兴趣了,如何退订 Observable 呢?

0 投票
1 回答
154 浏览

java - RxJava 阀门用例

RxJava 中是否有运算符、外部库或我缺少创建可流动/可观察的方法来接收控制数据发射的函数,例如阀门?

我有一个巨大的 json 文件需要处理,但我必须获取文件的一部分、实体列表、处理它然后获取另一部分,我尝试使用 windows()、buffer() 但我通过了 BiFunction to Flowable.generate() 在我收到第一个列表并且我还没有完成处理后继续执行。我还尝试了 hu.akarnokd.rxjava3.operators 中的 FlowableTransformers.valve() 但它只是在处理列表的 flatMap() 函数之前堆积了项目

编辑:我需要控制项目的排放,以免内存和处理数据的函数过载,因为该函数读取和写入数据库,处理也需要按顺序进行。处理数据的函数并不完全是我的,它是用 RxJava 编写的,预计我会使用 Rx。

我设法像这样解决它,但如果有其他方法请告诉我:

Edit2:这是我目前使用该功能的方式之一

0 投票
0 回答
61 浏览

java - 当使用 groupBy 运算符时,Flowable 会忽略 Subscription.request

我是 RxJava 的新手,如果有人能澄清这一点,那就太好了……

考虑以下流程:

  1. 每次订阅者请求一个值时生成/发出一个整数。
  2. 对生成的整数进行分组,以便每个值构成自己的组。注意:目标是模拟输入中存在大量组的情况。
  3. 现在对于每个组:
  • 将值收集到一系列 1 秒长的块中,并在第一个空块到达时停止。注意:在现实世界中,每组会有多个值,但这里每组恰好有 2 个缓冲区:1 个带有单个 int 的缓冲区和 1 个空缓冲区,这将有效地完成序列。
  • 将所有非空块(在本例中为 1)合并到一个列表中。
  • 将结果列表打印到标准输出中并再请求 1 个元素。

上述流程的原始实现:

如您所见,我只创建了 1 个自定义订阅者,它在创建订阅时请求 1 个项目,并且在每次处理列表时再请求 1 个。虽然flatMap'smaxConcurrency设置为Integer.MAX_VALUE(因为我想处理尽可能多的组) - 所有其他“提示”(例如 groupBy 和 flatMap's bufferSize)都设置为 1 并且没有其他缓冲(例如onBackpressureBuffer,“缓冲有限数量的来自当前的项目Flowable并允许它尽可能快地发射”)被请求。


所以问题是onNext's 的调用次数(因此,它请求一个值)远低于我传递给的 lambda 调用次数Flowable.generate- 大多数时候超过 40,000 次,并且我希望它至少与通过Subscrtiption.request. 正如我可以通过 lambda 的调用堆栈判断的那样,每次创建一个组时,它都会调用io.reactivex.rxjava3.internal.operators.flowable.FlowableGroupBy.GroupBySubscriber#onSubscribe,然后调用org.reactivestreams.Subscription#request传递bufferSize(在上面的代码片段中配置为 1 )给它,所以这个过程几乎是自我延续的,所以说话。我不能指定bufferSize为 0,我也不能限制同时处理的组数:有限maxConcurrency值很快导致MissingBackpressureException...

我是否在这里遗漏了一些基本的东西,或者没有办法在这样的用例中真正应用背压?

提前致谢。

0 投票
2 回答
195 浏览

java - RxJava:如何使用 Observable 代替嵌套循环?

现在我使用嵌套循环ReplaySubject来获取多个可观察对象,这需要在不使用任何循环的情况下进行转换,只使用可观察对象。

" Map<Integer, Integer> elem : list" 有 3 个元素。" MapOperationName.values()" 返回 7 个元素。所以我在上面的例子中完全应该是 21 岁。

试图重构这样的东西,但只得到 3 个元素:

如何正确组合ArrayListEnum获得全套配对组合?

0 投票
1 回答
413 浏览

java - RxJava:如何使代码异步工作?

如果我对异步工作的结论有误,请向我提出建议,并就如何使这项工作异步工作提出任何建议。

此代码执行网格单元中计算的设置结果。我假设异步工作应该按部分显示网格单元。因此,如果我们有 8 个内核作为结果,我们可以看到显示了 8 个单元格,并且在一段时间内显示了另外 8 个,依此类推(如果我设置了一些时间延迟)。但是现在作为结果单元格显示成为一个接一个。

模型:

主持人:

UPD:现在我制作了测试示例并尝试正确订阅主题。如果我使用 subject.onNext() 它异步工作,但我认为这是错误的,因为它无法检查 subject.hasComplete() 并获得“真”。请参阅下面的评论“TODO”。

摇篮:

0 投票
1 回答
1294 浏览

android - retrofit2 和 rxjava3:java.lang.IllegalArgumentException:找不到 io.reactivex.rxjava3.core.Observable 的调用适配器

我想使用 retrofit2 和 rxjava3 但我看到以下错误

此错误表示Rxjava 没有适配器,但我在下面的行中添加了它

.addCallAdapterFactory(RxJava2CallAdapterFactory.create())

和我的 ApiInterface

并用于活动

和 build.gradle

0 投票
1 回答
229 浏览

rx-java - RxJava Flowable 中的延迟元素

delayElements@ProjectReactor 发布者 Flux 中有一个非常直观的运算符,用于在每个发射的元素之间引入延迟。例如,下面的 cod 每秒发出一个元素。

为了在@Rxjava 中完成相同的行为,我必须做一些仪式。

有谁知道在@Rxjava 中执行上述操作的更好方法?

0 投票
0 回答
47 浏览

android - RxJava Junit4 多重可观察测试

第一次测试Rx,所以请给一些建议。

需要在模型中测试可观察链,结果给出 21 个数据对象:

这是我尝试过的一些方法。

首先尝试(这里我在获取结果时遇到问题并单独检查它们,因为每个对象内部的计算结果未知):

第二种方式:

像我用时间延迟和循环指出第二种方式那样进行测试是否正确?

如何正确检查结果它给出了 21 个对象并检查数据类的某些属性大于零的条件?还有其他需要执行的检查吗?

还有一个普遍的问题:需要测试使用此模型方法的 mvp Presenter 并因此显示收到的值。如何仅使用 JUnit4 来做到这一点?