问题标签 [rx-kotlin2]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
2638 浏览

kotlin - RxJava Flowable.Interval 使用单一平面图时的背压

我有一个场景,我需要定期调用 API 来检查结果。我正在使用Flowable.interval创建一个调用 API 的间隔函数。

但是,我遇到了背压问题。在下面的示例中,间隔中的每个刻度都会创建一个新单曲。期望的效果是仅在调用尚未进行时才调用 API

我可以使用像这样的过滤器变量来解决这个问题:

但这似乎是一个 hacky 解决方案。我已经厌倦了使用onBackPressureDropinterval功能,但它没有效果。

有什么建议么?

0 投票
1 回答
99 浏览

android - 使用动态参数在 RxKotlin 中实现共享 Flowable

我尝试过的任何方法似乎都无法解决我的问题。

我有三个具有 onClick 行为的按钮。这些按钮中的每一个都调用相同的方法launchActivity,但具有不同的参数。launchActivity对从方法接收到的变量进行一些 IO,onClick然后返回一个意图。我希望能够实现一个 RxKotlin/Java Flowable 来共同处理三个 onClick 方法的背压,这样我就可以实现BackpressureStrategy.DROP. 因此,如果在仍在线程上处理的情况下启动,则如果onClick1启动onClick2将被丢弃。launchActivityonClick1io()

如果我要以 Single 来实现它,我会实现这些onClick方法,有点像:

但是我不知道如何launchActivity从所有三种方法都可以访问的共享 Flowable 中调用,onClick同时仍然允许它们传递它们的唯一变量inFileoutFile变量并强制执行背压。

基本标准是:

  • 确保launchActivityio()线程上运行
  • onClick将每个方法的唯一参数传递给launchActivity每次onClick[#]运行。
  • BackpressureStrategy.DROP用于确保仅处理系列中的第一次点击launchActivity
  • 结果intentfromlaunchActivity被传递给startActivity

如何实现 Flowable 以允许这种行为?

0 投票
0 回答
1246 浏览

android - RxJava2 和 Kotlin:OnErrorNotImplementedException

我将 RxJava2 与 Kotlin 一起使用。我不知道为什么,但我总是有这个错误:

io.reactivex.exceptions.OnErrorNotImplementedException:一个操作未实现:未实现

所以我向订阅者添加了 onError 回调,但现在我有这个错误(在 onError 内):

kotlin.NotImplementedError: 一个操作未实现:未实现

这是我的代码:

在我的演讲者中:

如何正确处理错误?谢谢

0 投票
1 回答
134 浏览

rx-java2 - 重复自身直到找到期望值的 RxObservable

此函数的目标是创建一个流,该流定期发出值,直到遇到与谓词匹配的值。

这是我想出的一些骨架代码:

例如,如果我有以下枚举:

还有一些可以检索正确阶段的可调用对象,我应该能够传递可调用对象和检查 if 的谓词stage == FINISHED,并进行轮询,直到我得到FINISHED事件。

我遇到的问题是当收到的事件不是最终事件时生成一个可观察的。在这种情况下,observable 应该继续轮询事件,直到它接收到与谓词匹配的事件或直到它没有更多的订阅者。

这个可观察的应该:

  • 在收到至少一个订阅者之前不要轮询
  • 每 x 秒轮询一次
  • predicate如果返回 true ,则将自身标记为完成
  • 如果从 >0 个订阅者变为 0 个订阅者,则完成自身

使用监视池只是为了确保监视同一个 id 的两个线程不会轮询两次。从地图中删除可观察对象也只是为了不堆积。出于同样的原因,只发出一个变量的 observable 不会被存储以供参考。

如何为上面添加的点添加功能?我将链接到一个我发现有用的现有RxJava Github 问题,但据我所知,它不允许处理由可调用对象发出的值的谓词。

0 投票
1 回答
61 浏览

rx-java2 - 你如何创建一个在 Rx 中的 List 上工作的计时器?

我想在完成之前查找要找到的整个项目列表,如果找不到整个列表,则将抛出异常(超时或自定义异常)。与内置的 Observable.timer() 一样,但不是在第一个项目发出后通过测试,我希望它要求找到列表中的所有项目。

这是一个例子。假设我有一些测试函数会发出 Observable<FoundNumber>。它看起来像这样:

然后将调用该函数以获取将与预期数字列表进行比较的数字。是否有来自 scanForNumbers 的其他数字不在“目标”列表中并不重要。他们只会被忽略。像这样的东西:

因此,预期的数字(202、302 和 999)与将发出的数字(202、302 和 400)不完全匹配。因此,应该发生超时,但是使用 Observable.timer() 的内置版本,它不会超时,因为至少观察到了一项。

这是我想要的。有人知道如何在 RxJava/RxKotlin 中编写代码吗?

你如何编码,希望使用 RxJava/Kotlin,一种在提到的列表上超时的方法?

0 投票
1 回答
26 浏览

kotlin - RxKotlin repo gradle build compilation fails with gradle 4.7 (JDK 1.8)

I tried the following:

It fails with:

...FAILURE: Build failed with an exception.

  • Where: Build file '/home/myuser/RxKotlin/build.gradle' line: 13

  • What went wrong: A problem occurred evaluating root project 'rxkotlin'.

    org/gradle/api/internal/project/AbstractProject

0 投票
1 回答
123 浏览

java - 如果在最后一项之后经过特定时间,则发出值

我有一个 observable: Observable.create<Boolean> {emitter = it},我向其推送一些值。我希望它发布一个“假”值,只要某个特定时间段过去,而没有任何值被推送到该发射器。

RxJava/Kotlin 2 怎么可能?

0 投票
2 回答
4221 浏览

android - 将带条件的循环转换为 RxJava 流

我有在 while 循环中执行阻塞操作的代码(从服务器下载一些数据)。客户不知道每一步要返回多少物品。下载 N 个项目时循环中断。

downloadItems执行阻塞 HTTP 调用并返回列表。现在让我们假设downloadItems更改和新的返回类型是Observable<Item>. 我如何更改代码以使用 RxJava 而无需执行类似的操作blockingGet

0 投票
1 回答
1421 浏览

android - RxJava2 onNext() 多次调用?

我有一个返回 Observable 的方法,如下所示:

并订阅如下:

当调用 getDataFromDb() 并在一段时间后根据条件调用 getDataFromApi() 时,我遇到了这个问题。对于第一次调用,它工作正常,但在第二次调用 onNext 时,会使用来自 getDataFromDb() 的旧数据响应多次调用 onNext。请让我知道我做错了什么。我对 RxJava 有点陌生。

0 投票
1 回答
321 浏览

java - Rx Kotlin/Java Observable 状态

有什么方法可以创建一个 RxJava2Observable来通知状态变化?像:

然后在任何地方使用它,例如:

每次状态更新都必须调用此订阅