问题标签 [rx-kotlin2]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
kotlin - RxJava Flowable.Interval 使用单一平面图时的背压
我有一个场景,我需要定期调用 API 来检查结果。我正在使用Flowable.interval
创建一个调用 API 的间隔函数。
但是,我遇到了背压问题。在下面的示例中,间隔中的每个刻度都会创建一个新单曲。期望的效果是仅在调用尚未进行时才调用 API
我可以使用像这样的过滤器变量来解决这个问题:
但这似乎是一个 hacky 解决方案。我已经厌倦了使用onBackPressureDrop
该interval
功能,但它没有效果。
有什么建议么?
android - 使用动态参数在 RxKotlin 中实现共享 Flowable
我尝试过的任何方法似乎都无法解决我的问题。
我有三个具有 onClick 行为的按钮。这些按钮中的每一个都调用相同的方法launchActivity
,但具有不同的参数。launchActivity
对从方法接收到的变量进行一些 IO,onClick
然后返回一个意图。我希望能够实现一个 RxKotlin/Java Flowable 来共同处理三个 onClick 方法的背压,这样我就可以实现BackpressureStrategy.DROP
. 因此,如果在仍在线程上处理的情况下启动,则如果onClick1
启动onClick2
将被丢弃。launchActivity
onClick1
io()
如果我要以 Single 来实现它,我会实现这些onClick
方法,有点像:
但是我不知道如何launchActivity
从所有三种方法都可以访问的共享 Flowable 中调用,onClick
同时仍然允许它们传递它们的唯一变量inFile
和outFile
变量并强制执行背压。
基本标准是:
- 确保
launchActivity
在io()
线程上运行 onClick
将每个方法的唯一参数传递给launchActivity
每次onClick[#]
运行。BackpressureStrategy.DROP
用于确保仅处理系列中的第一次点击launchActivity
- 结果
intent
fromlaunchActivity
被传递给startActivity
如何实现 Flowable 以允许这种行为?
android - RxJava2 和 Kotlin:OnErrorNotImplementedException
我将 RxJava2 与 Kotlin 一起使用。我不知道为什么,但我总是有这个错误:
io.reactivex.exceptions.OnErrorNotImplementedException:一个操作未实现:未实现
所以我向订阅者添加了 onError 回调,但现在我有这个错误(在 onError 内):
kotlin.NotImplementedError: 一个操作未实现:未实现
这是我的代码:
在我的演讲者中:
如何正确处理错误?谢谢
rx-java2 - 重复自身直到找到期望值的 RxObservable
此函数的目标是创建一个流,该流定期发出值,直到遇到与谓词匹配的值。
这是我想出的一些骨架代码:
例如,如果我有以下枚举:
还有一些可以检索正确阶段的可调用对象,我应该能够传递可调用对象和检查 if 的谓词stage == FINISHED
,并进行轮询,直到我得到FINISHED
事件。
我遇到的问题是当收到的事件不是最终事件时生成一个可观察的。在这种情况下,observable 应该继续轮询事件,直到它接收到与谓词匹配的事件或直到它没有更多的订阅者。
这个可观察的应该:
- 在收到至少一个订阅者之前不要轮询
- 每 x 秒轮询一次
predicate
如果返回 true ,则将自身标记为完成- 如果从 >0 个订阅者变为 0 个订阅者,则完成自身
使用监视池只是为了确保监视同一个 id 的两个线程不会轮询两次。从地图中删除可观察对象也只是为了不堆积。出于同样的原因,只发出一个变量的 observable 不会被存储以供参考。
如何为上面添加的点添加功能?我将链接到一个我发现有用的现有RxJava Github 问题,但据我所知,它不允许处理由可调用对象发出的值的谓词。
rx-java2 - 你如何创建一个在 Rx 中的 List 上工作的计时器?
我想在完成之前查找要找到的整个项目列表,如果找不到整个列表,则将抛出异常(超时或自定义异常)。与内置的 Observable.timer() 一样,但不是在第一个项目发出后通过测试,我希望它要求找到列表中的所有项目。
这是一个例子。假设我有一些测试函数会发出 Observable<FoundNumber>。它看起来像这样:
然后将调用该函数以获取将与预期数字列表进行比较的数字。是否有来自 scanForNumbers 的其他数字不在“目标”列表中并不重要。他们只会被忽略。像这样的东西:
因此,预期的数字(202、302 和 999)与将发出的数字(202、302 和 400)不完全匹配。因此,应该发生超时,但是使用 Observable.timer() 的内置版本,它不会超时,因为至少观察到了一项。
这是我想要的。有人知道如何在 RxJava/RxKotlin 中编写代码吗?
你如何编码,希望使用 RxJava/Kotlin,一种在提到的列表上超时的方法?
kotlin - RxKotlin repo gradle build compilation fails with gradle 4.7 (JDK 1.8)
I tried the following:
It fails with:
...FAILURE: Build failed with an exception.
Where: Build file '/home/myuser/RxKotlin/build.gradle' line: 13
What went wrong: A problem occurred evaluating root project 'rxkotlin'.
org/gradle/api/internal/project/AbstractProject
java - 如果在最后一项之后经过特定时间,则发出值
我有一个 observable: Observable.create<Boolean> {emitter = it}
,我向其推送一些值。我希望它发布一个“假”值,只要某个特定时间段过去,而没有任何值被推送到该发射器。
RxJava/Kotlin 2 怎么可能?
android - 将带条件的循环转换为 RxJava 流
我有在 while 循环中执行阻塞操作的代码(从服务器下载一些数据)。客户不知道每一步要返回多少物品。下载 N 个项目时循环中断。
downloadItems
执行阻塞 HTTP 调用并返回列表。现在让我们假设downloadItems
更改和新的返回类型是Observable<Item>
. 我如何更改代码以使用 RxJava 而无需执行类似的操作blockingGet
?
android - RxJava2 onNext() 多次调用?
我有一个返回 Observable 的方法,如下所示:
并订阅如下:
当调用 getDataFromDb() 并在一段时间后根据条件调用 getDataFromApi() 时,我遇到了这个问题。对于第一次调用,它工作正常,但在第二次调用 onNext 时,会使用来自 getDataFromDb() 的旧数据响应多次调用 onNext。请让我知道我做错了什么。我对 RxJava 有点陌生。
java - Rx Kotlin/Java Observable 状态
有什么方法可以创建一个 RxJava2Observable
来通知状态变化?像:
然后在任何地方使用它,例如:
每次状态更新都必须调用此订阅