问题标签 [rx-java3]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
3805 浏览

java - 为什么不调用 doOnDispose?

当像这样创建 Observable 时:

doOnSubcribe被调用,doOnDispose不被调用。

这是为什么?

0 投票
0 回答
96 浏览

java - 具有无限流的 RxJava `switchOnNext`

我正在尝试编写一个使用switchOnNext运算符的简单示例。我想生成无限流的无限流。每个单独的流将生成从 1 到无穷大。

使用switchOnNext我希望每个可观察对象都会发出它的第一个n元素,然后是下一个元素,依此类推。

为了生成一个从 1 到无穷大的值的 observable,我实现了静态rangeInf函数。该main方法包含应该打印值的逻辑。

然而,当运行程序时,只有第一个流被订阅,并且只有它的值被打印到控制台。为了理智起见,我采用了可观察的区间(内置的也是无限的),但那里的行为与预期的一样。

我在这里想念什么?

起初我认为这是因为间隔是在它自己的单独线程上。但是我尝试添加.observeOn(Schedulers.computation())rangeInf方法,但这似乎也不能解决问题。

输出与interval

输出与Observable.generate

资源

0 投票
1 回答
509 浏览

java - RXJava 顺序执行 observable

我有多个返回Observable<String>. 每个函数在文件系统上执行命令。我需要一个接一个地执行每个函数,并在 observable 中获取函数的输出。最后,我想要一个Observable<String>包含按函数调用顺序输出的所有函数的输出

单独地,每个功能都按预期工作,但我需要正确合并输出。

我试过 Observable.concatArray(func1, func2, ... ) 像这样:

但这只是保留了 observable 事件的顺序。不是函数的顺序。我的意思是如果 func1 发出事件 A 和 A',而 func2 发出 B 和 B',我将有 A->A'->B->B'。但是 func2 会在 func1 之后立即启动。这导致我的问题是 func1 需要在 func2 启动之前完成。

第一个函数通过 maven 在文件系统上生成目录。所以,一个长期的任务。第二,在这个目录中写入一个文件。但是 concatArray 在第一个之后立即启动第二个。并且命令失败,因为此时该目录不存在。

有没有办法避免像这样丑陋的事情:

0 投票
0 回答
169 浏览

server - RxJava 在微服务服务器项目中处理 Observable/Flowable

我将 RxJava 用于服务器微服务项目,其中使用 Jetty 作为 HTTP Servlet 服务器。

我正在使用 Observable 处理来自客户端或主服务器的请求,用于不同的流程。当一个请求点击下面的 api 时,我会在 Observable 完成工作后返回一个响应。

我想知道是否需要处理这些 Observables 或 Flowables。据此:当他们调用完成或错误时,RxJava2 是否自动处理 observable? 和 RxJava3 源代码,我认为 Flowable 至少不会自动处理?

如果我需要手动处理资源,最好创建一个,然后在每个被调用的Observer( )处CompositeDisposable添加一次性,完成后调用。CompositeDisposableObservable1...Observable3onSubscribe()compositeDisposable.dispose()concat

我是否还应该监视 JettyAbstractLifeCycle来处理这些 Observables(听起来像 Android)?我不确定其他人如何在服务器端使用 RxJava,对这些问题的任何建议以及服务器项目中的一般 Rx 方法持开放态度。

谢谢!

0 投票
0 回答
1675 浏览

android - Rxjava 3、retrofit2 和多次调用问题

我在实现 RxJava3/Retrofit2 时遇到了麻烦,我需要完成的是:

总体思路是将云数据库同步到设备上的 SqLite 数据库(房间)。 数据库可能会变大,大约 100,000 个寄存器或更多,因此同步过程可能需要一些时间,我的第一次尝试是在一个请求中完成并获取所有寄存器,然后将它们保存到 SqLite(房间),但这在某些情况下,取决于设备产生了一些内存不足的异常,所以经过一些研究,我发现 RxJava 是答案,还实现了一些 API 调用分页。

  1. 首先,我试图做一个概念验证,启动第一个改造调用并显示对活动的响应,但我无法让它工作,我被卡住了!!

这是我得到的错误:java.lang.ClassCastException: io.reactivex.rxjava3.internal.observers.LambdaObserver cannot be cast to io.reactivex.rxjava3.core.Observable

  1. 我想做的是,我在这里寻求帮助是;第一次调用,我可以获得注册总数并定义页数,基于此,发送多个改造请求,每次我得到响应(List < Item >),将它们保存到房间。

遵循一些代码

毕业典礼

类项目

类 ItemSyncDetails

接口 FrekuencyApi

类 MainActivity

错误日志

json 为 - http://192.168.1.10:82/api/v1.0/item?pageSize=1¤tPage=1&sortBy=1

在此先感谢,非常感谢您的帮助。

我对 MainActivity 类做了一些更改

随着对 MainActivity 类的更改,基本上有两个方法 getRecordsCount() 和 getAllRecords(int numPages),每个触发一个 RX Java 调用进程,第一个调用一个 Retrofit 调用,并根据第一个调用的答案,调用第二种方法,它将向 API 发送 (n) 请求,其中 n 是参数 numPages,我使用更新的 progressDialog,当孔进程完成时,progressDialog 是 close()。在第一个 RX java 调用完成的 onComplete 方法上,它调用了第二个 RX Java 方法,此时我需要的是:

  1. 如何在一个 Rx java 进程中将其改进为更优雅的代码,而不是两次调用?
  2. 我正在考虑在 getAllHandleResults() 方法上对数据库进行房间插入调用以获取列表,这是插入数据库的正确位置吗?
  3. 你对这个问题有什么建议吗?这个数据库部分也可以使用 RX Java 完成吗?你有一些网页可以让我获得有关它的信息吗?再次感谢您的帮助。
0 投票
0 回答
277 浏览

java - RxJava 方法转换为 RxJava3

这是一个用 RxJava 编写的方法

我正在尝试将其转换为 RxJava3,但某些参数已更改: Func1 已更改为 Function Action0 已更改为 Action

在我进行更改后,过滤器中出现以下错误:

谁能帮我?谢谢!

0 投票
3 回答
331 浏览

android - RxJava: Skip all the errors in fromIterable() and notify subscriber when all the items are emitted - Flowable.parallel execution

I have an API call which verifies some status against an "Id". The API returns Single or error. I have a list of such Id's, Only one Id is valid to return success or none (all id's return error). What I need is, Iterate through each Id and skip the errors from API call, until either a success or end of the list. I am able to achieve this sequentially. However, I am trying to do the same, using ParallelFlowable. It works fine when an Id returns success, But when there is no id which returns success (all ids fail), then it just skip all the errors from API, but does not notify the subscriber after all the ids are validated. I am not sure how to handle this.

0 投票
0 回答
186 浏览

android - 使用 Retrofit + RxJava 的 Android Twitter API 状态/过滤器:永远不会点击 onNext 或 onError

我尝试使用来自 Twitter 的 Streaming API:statuses/filter 作为以下链接 (1) (1) https://stream.twitter.com/1.1/statuses/filter.json?track=twitter

文档:https ://developer.twitter.com/en/docs/tweets/filter-realtime/api-reference/post-statuses-filter

但是,在request_Token(2) 响应中获得正确的令牌并向状态/过滤器发出请求后,我确实收到了响应 200,但仍然没有命中onNextonError无休止。

(2) https://api.twitter.com/oauth/request_token

2020-05-09 13:09:26.023 28542-28617/com.example.myapplication D/OkHttp: <-- 200 https://stream.twitter.com/1.1/statuses/filter.json?track=foo&follow=1234 (1002ms)

我不确定我的代码中是否缺少某些内容或错误。请问我可以有一些反馈吗?这是我的代码:

谢谢大家。

0 投票
1 回答
1331 浏览

android - Rxjava 3 + Retrofit2 - 多次插入数据库问题

我正在尝试执行以下操作;使用 Retrofit 将云数据库同步到设备上的本地 SqLite 数据库(房间)。DB 可能会变大,大约有 100,000 个或更多寄存器,因此同步过程可能需要一些时间。所以它发送第一个Retrofit请求来获取寄存器的数量,这样它就可以计算出总页数,然后它会发送多个Retrofit Request,从API中获取所有数据,每次请求后,它将数据保存到房间。

现在,我在组合两个 RxJava 调用或进程时遇到了麻烦,同样在第二个 RxJava 进程上,在 Retrofit 调用之后,有一个对象列表的房间插入,但是在洞进程结束后,我注意到没有插入了 100% 的所有记录,每次我运行该进程时,插入的记录数都会发生变化,大约是 80% - 98%,但从来没有 100%,即使所有的 Retrofit 调用都已发送。

请帮助我:

  1. 如何在一个 RxJava 调用中完成所有过程,而不是像我现在拥有的 2 个?
  2. 如何将 100% 的记录插入到 Room?

按照代码:

摇篮

ItemSyncDetails

物道

注意:我没有使用 Observer/Flowable/Maybe/Single,因为我已经能够使其与 RxJava 一起使用

数据接口

项目存储库

http://192.168.1.10:82/api/v1.0/item?pageSize=1¤tPage=1&sortBy=1

注意:页面大小可能会改变,我使用的是每页 100 个项目的固定大小。

0 投票
0 回答
89 浏览

bluetooth-lowenergy - RxAndroidBle 和 RxJava3 的状态如何?

RxAndroidBle 是一款很棒的软件,它减少了 Ble 项目的开发时间,并显着提高了稳定性和可读性。

我只想问,RxAndroidBle 的 rxjava3 分支是什么状态?它说它是一个测试分支,目前已经过时。那么生产应用是不是不建议走rxjava3分支呢?

我想知道未来是否有计划专注于 rxjava3,因为在 RxJava 页面上它说:

2.x 版本处于维护模式,仅在 2021 年 2 月 28 日之前通过错误修复获得支持。

来自 t4rj4n 的问候