问题标签 [rx-android]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
458 浏览

rx-java - 如何在 ReactiveX 中正确编写计时器 Observables?

该方法从远程端startPositionInfoPolling每 2 秒执行一次轮询PositionInfo并更新 UI。

该方法stopPositionInfoPolling停止 UI 更新。

我想更改代码,使其不定期(例如每 20 秒)获取远程数据,并定期(例如每 1 秒)使用外推值更新 UI。

我使用 RxJava 的第一个解决方案如下所示:

有人可以帮我把这段代码转换成看起来不像回调的东西吗?!我觉得flatMap是关键,但我的头脑仍然没有反应性地思考;-)

还有一个问题是mPollingTimerSubscription.unsubscribe();没有取消订阅/取消/停止mUpdatingTimerObservable,因此正在维护另一个订阅。

提前感谢您的任何评论。

更新:感谢@hello_world 降低了原始复杂性;-)

0 投票
3 回答
27834 浏览

java - Use cases for RxJava schedulers

In RxJava there are 5 different schedulers to choose from:

  1. immediate(): Creates and returns a Scheduler that executes work immediately on the current thread.

  2. trampoline(): Creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes.

  3. newThread(): Creates and returns a Scheduler that creates a new Thread for each unit of work.

  4. computation(): Creates and returns a Scheduler intended for computational work. This can be used for event-loops, processing callbacks and other computational work. Do not perform IO-bound work on this scheduler. Use Schedulers.io() instead.

  5. io(): Creates and returns a Scheduler intended for IO-bound work. The implementation is backed by an Executor thread-pool that will grow as needed. This can be used for asynchronously performing blocking IO. Do not perform computational work on this scheduler. Use Schedulers.computation() instead.

Questions:

The first 3 schedulers are pretty self explanatory; however, I'm a little confused about computation and io.

  1. What exactly is "IO-bound work"? Is it used for dealing with streams (java.io) and files (java.nio.files)? Is it used for database queries? Is it used for downloading files or accessing REST APIs?
  2. How is computation() different from newThread()? Is it that all computation() calls are on a single (background) thread instead of a new (background) thread each time?
  3. Why is it bad to call computation() when doing IO work?
  4. Why is it bad to call io() when doing computational work?
0 投票
3 回答
5562 浏览

android - Schedulers.io 创建了数百个 RxCachedThreadScheduler

我在我的 android 应用程序中使用 RxJava,它多次遇到 OutOfMemoryError。我用设备管理器检查了它,我刚刚注意到,我有超过 200 个线程,其中大多数处于等待状态,通常是 RxCachedThreadScheduler。由于线程过多而引发 OOMError。我还注意到,如果我按下一个按钮,它会调用一个服务并获取一个令牌并缓存它,线程数会增加 5!

所以,我用谷歌搜索发现,Schedulers.io 可以创建无限线程。当我用 Schedulers.computation 替换每个 Schedulers.io 时,问题就消失了,但这没有任何意义,因为我使用 Schedulers.io 就像它应该使用的那样。

那么我如何使用 Schedulers.io 并确保它不会创建太多线程呢?

更新

我这样取消订阅:

更新#2

我使用 Schedulers.io 的常规方式是:

和:

0 投票
1 回答
3325 浏览

android - 为什么在这段代码中没有调用 OnComplete?(RxAndroid)

我知道在发出所有项目时会调用 Observer 的 OnComplete 。在下面的代码中,我将光标中的数据放入 flatMap 运算符中的 ArrayList 中。我的光标有 100 个条目( c.getCount() 给出 100 ),我的列表大小为 100。 onNext 也被调用了 100 次。但是 onComplete 没有被调用。我在 onComplete 中填充一个列表视图。

新的 Observable 流将在 100 之后完成,但事实并非如此。

任何帮助表示赞赏,谢谢

0 投票
1 回答
1330 浏览

rx-java - rxjava - 使用 concat 从 observables 缓存不起作用

按照本教程 ,我认为我会创建一个虚假的内存、磁盘、网络对象,以便在其中一个失败时查看它们的顺序。我的目标是查看一个是否失败,它会根据文章评论连接到下一个:

这种模式的关键是 concat() 只在需要时订阅每个子 Observable。如果数据被缓存,则不会对较慢的源进行不必要的查询,因为 first() 会提前停止序列。换句话说,如果内存返回一个结果,那么我们就不会费心去磁盘或网络。相反,如果内存和磁盘都没有数据,它将发出新的网络请求。

然后我创建了以下测试:

但是,在运行此测试时,我期望它打印磁盘 observable (stream) 。所以输出应该是 38 Oriole 等,因为第一个 observable 返回 null 并且如果我认为失败,concat 应该得到下一个流。我的意思是根据文章,如果一个流失败,例如内存流,那么 concat 应该转到下一个流,即磁盘。但对我来说,它没有这样的事情。发射在第一个 null 错误处停止。我究竟做错了什么。在我继续使用反应式编程制作真正的缓存系统之前,我正在做这个假测试以确保这种方式有效。

顺便说一下,这里是 rxjava 依赖 im 使用:

0 投票
1 回答
349 浏览

android - 如何在没有 Subjects 的情况下使用 RXJava 和 Observables 抽象出硬件?

我正在做一个项目,我有一个硬件设备通过 USB 电缆与我的 Android 应用程序通信。包裹在薄序列化层中的协议缓冲区是我的通信协议。

目前,USB 通信使用监视 USB 的异步侦听器进行。从这里,我读取来自我的硬件的字节,对它们采取行动(例如,反序列化它们,解开 protobuf 等......),并适当地设置任何成员变量等。

我对此类 + 服务的公共 API 是 2 个 Observables:

诀窍是:填充 DeviceInfo 需要 5 次来回异步调用(不是我的设计),只要应用程序打开,DeviceState 就会不断从硬件发送出去。

对于设备信息,我等到所有 5 个都准备好,然后通知我的观察者(我知道我可以进行订阅者端过滤,但我宁愿推送一个完全就绪的 DeviceInfo),对于状态,我只是推送那些当我收到它们时。

我的问题:有没有办法在不使用主题作为 DeviceInfo 和 DeviceState 的后备存储的情况下做到这一点?我一直在阅读有关 Subjects 是多么邪恶的文章,虽然我不相信它们是邪恶的(但是,它们不一定是“功能性的”),但这似乎是我无法控制的外部 IO 的一个很好的用途。

0 投票
2 回答
3970 浏览

android - 在 RxJava 中,如何启动从 API 生成的可能无限的事件流?

我有一个可供客户使用的 API,可以简化为:

Event每当客户端调用 API(技术上通过 Binder 到Service派生类)时,实例就会进入我的系统,然后对这些 API 进行处理、过滤并分派到其他内部组件。我不关心过去的事件,只关心订阅者订阅时可用的事件。它似乎很适合 Rx 范式,我刚刚开始接触它。

我需要一个创建一次的 Observable,允许多个订阅者,并且可以提供实例Event,然后通过反应管道发送给观察者。ASubject似乎适合我想要做的事情(特别是,这个问题的答案引起了我的共鸣)。

其他 RxJava 用户推荐什么?

0 投票
1 回答
2432 浏览

android - 字符串值更改时 RxAndroid 更新字段/文本视图

我刚刚开始使用 RxJava/RxAndroid,我想知道是否可以使用它来解决以下问题。基本上,给定一个字段,比如一个文本视图和一个值,一个字符串,我正在寻找一种在字符串值发生变化时自动更新文本视图的方法。我不确定我将如何将其实现为 Observable。让我演示一下;

我正在寻找的 RxAndroid/RxJava 是否可行?

0 投票
2 回答
5263 浏览

android - RxJava 的 .debounce() 干扰了我的 Observable 的线程和错误处理

我想在我的 Android 应用程序中进行标准搜索,在其中输入 an EditText,稍等直到用户完成输入,然后使用 Retrofit 启动网络请求:

现在,我有一个测试服务器,它接受改造调用并返回一个列表。当我输入“崩溃”时,服务器会执行一些无效代码和错误,返回 http 状态代码 500 和一些错误 html。因此,改造调用失败。

我认为外部 Observable 链不应该受此影响。请参阅我之前的问题:In RxJava, how to retry/resume on error, 而不是完成 observable

但是,外部 Observable 也会出错,导致链终止。错误是: The current thread must have a looper!

奇怪的。现在我尝试不使用.debounce()同样的事情,服务器有一个内部错误,但外部 Observable没有错误。

那么它对.debounce()导致这种行为的线程做了什么?我该如何解决它?

0 投票
1 回答
1064 浏览

android - 如果发生异常,将 flatMap 短路

我有一个flatMap运行一系列 HTTP 调用来注册用户、检索他们的个人资料和其他一些事情的方法。我想根据错误发生的阶段通知onErrorsubscription特定的错误消息。flatMap也许注册成功,但检索创建的配置文件却没有。我怎样才能做到这一点,以便flatMap在发生错误时停止所有后续调用?

这是我的一些伪代码Observableand Subscription