问题标签 [rx-kotlin2]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
160 浏览

kotlin - 在最后插入 observable 之后,switchOnNext 运算符不会为订阅发出

首先,一些背景(也许有更好的方法):

我们有一个模块可以在特定的 Observable 上发出传入的蓝牙消息。然后我们处理这些消息,最后在最后订阅以发送消息。这种处理可能会在某个时候发生变化,这对于大多数处理意味着重新创建中间 Observables,以及依赖它的所有 observables(因为它们现在将处理无效数据)。

我们想改变它,以便重新创建处理的某些部分不需要重新创建依赖它的所有内容,主要是因为我们不必一直记住什么依赖于什么,并且还让具有内部状态的操作符(如缓冲、扫描或去抖动)不会丢失这个内部状态。

有希望的解决方案:

通过使用 switchOnNext 操作符,我们可以解决这个问题。每当重新创建中间 observable 时,我们只需将其添加到 switchOnNext 的原点,订阅 switchOnNext 输出的任何人都会立即获得新结果。

问题:

如果 switchOnNext 之后的处理必须改变,它将停止获取结果,直到之前的 observable 改变。这意味着我们现在遇到了相反的问题。每当某些部分发生变化时,我们必须递归地重新创建它所依赖的所有内容。这稍微好一点(跟踪依赖的东西比跟踪依赖它的所有东西要容易得多),但是 observables 仍然会丢失内部状态,因为它们必须重新创建。

这种行为似乎违反了文档所说的应该发生的事情,但它并没有明确说明一种或另一种方式。

示例代码:

此代码演示了该问题。

此代码的输出:

预期输出:

第一次 obsAux1 发出时,所有三个订阅都应该打印出来,但只有在它被添加到 publishSubject 之前的那些会打印出来。

第二次 obsAux1 发出时,不应打印任何内容,因为 obsAux2 已经插入。这按预期工作

obsAux2 第一次发出时,所有四个订阅都应该打印。第三个订阅按预期打印,这应该订阅工作正常。但是第四个订阅没有打印任何内容,因为它是在将 obsAux2 插入到 publishSubject 之后添加的。

0 投票
1 回答
181 浏览

kotlin - 如何在 Kotlin DSL 中设置 Rx Action Consumer

我正在研究开发用于命令的KotlinDSLRxKotlin

我有一些工作,但是我会改进设置 Rx Action(s) 和Consumer(s) 的方法。

我目前拥有的代码类似于:-

我使用此代码如下: -

我想“改进”的代码是在设置Action或时Consumer

这是我可以用来设置“onCompile”操作的唯一方法吗?

例如,我必须提及“Action {}”还是有一个“速记”版本我可以使用来达到相同的结果?

0 投票
1 回答
118 浏览

rx-java2 - RxJava/RxKotlin:如果一个源完成(不是全部),combineLatest 已经完成

基本上,我有两个Flowables F并且G我想combineLatest在它们上使用,但我希望组合Flowable在完成时已经F完成(即使G仍在运行)。

这是一个我用丑陋的解决方案实现的示例:

我们可以将其提取到扩展函数中:

有没有更好的表达方式?

0 投票
2 回答
412 浏览

kotlin - RxJava:忽略 Flowable 中的一些错误

我有一个我想忽略Flowable<T>的子类型和错误IgnoreThisError(导致正常完成),所有其他错误都应该向下游传播。

科特林示例:

这个函数 onErrorComplete 是我需要的并且不存在......

0 投票
1 回答
75 浏览

rx-java2 - 使用`onBackpressureLatest`在阻塞Flowable中删除中间消息

我有一个链,我在其中执行一些阻塞 IO 调用(例如 HTTP 调用)。我希望阻塞调用消耗一个值,继续进行而不中断,但同时丢弃所有堆积的东西,然后以相同的方式消耗下一个值。

考虑以下示例:

从幼稚的角度来看,我希望它会打印出类似的东西0, 10, 20, ...,但它会打印出0, 1, 2, ....

我究竟做错了什么?

编辑:

我想过天真地添加debounce来吃掉传入的流:

但是,现在我得到一个java.lang.InterruptedException: sleep interrupted.

编辑:

似乎有效的方法如下:

输出符合预期0, 10, 20, ...!!

这是正确的方法吗?

我注意到这throttleLast将切换到计算调度程序。有没有办法回到原来的调度程序?

编辑:

我也偶尔会遇到java.lang.InterruptedException: sleep interrupted这种变体。

0 投票
0 回答
30 浏览

rx-java2 - `Flowable.interval`和阻塞链没有`MissingBackpressureException`异常

考虑以下示例:

MissingBackpressureException不要错过背压,我希望得到Flowable.interval.

我怎么了?

0 投票
2 回答
212 浏览

rxjs - Rx (RxKotlin) - rightGroupJoin 使用 groupJoin - 合并/组合两个不同类型的 observable

经过几天的努力,似乎是一个简单的任务,我来找你们:)

想法很简单。我有两个流/可观察对象,“左”和“右”。我希望“右”中的项目缓冲/收集/聚合到“左”中的“当前”项目。
因此,“左”中的每个项目都定义了一个新的“窗口”,而所有“右”项目都将绑定到该窗口,直到发出新的“左”项目。因此,可视化:

任务:
'左':|- A - - - - B - - C - - - -|
'正确' : |- 1 - 2 - 3 -4 - 5 - 6 - - -|
'结果' : |- - - - - - - -x - - -y - - - -z| ( Pair<Left, List<Right>>)
其中:A,1B,4 (所以 x) ; C (so y) 同时发出
所以:x = Pair(A, [1,2,3]), y = Pair(B, [4, 5])
并且:'right' & 'result' 完成/当 'left' 完成时终止
So: z = Pair(C, [6]) - 由于 'left' 完成而发出

----
编辑 2 - 最终解决方案!
为了将“右”项与下一个“左”而不是前一个聚合,我将代码更改为更短/更简单的代码:

编辑 1 - 初始解决方案!
取自下面@Mark(已接受)的答案,这就是我想出的。
它被分成更小的方法,因为我也可以multiRightGroupJoin()根据需要加入尽可能多的(正确的)流。

在哪里:

----

到目前为止我得到了什么
经过大量阅读并了解不知何故没有开箱即用的实现,我决定使用groupJoin,主要使用这个链接,就像这样:(这里有很多问题和需要改进的地方,不要使用此代码)

我还使用类似的用法来创建一个timedBuffer()- 与每个缓冲区()相同buffer(timeout)但带有时间戳,List以了解它何时开始。Observable.interval(timeout)基本上通过在(作为“左”)上运行相同的代码

问题/问题(从最简单到最难)

  1. 这是做类似事情的最佳方式吗?这不是矫枉过正吗?
  2. 当“左”完成时,是否有更好的方法(必须)来完成“结果”(和“右”)?没有这个丑陋的布尔逻辑?
  3. 这种用法似乎弄乱了 rx 的顺序。请参阅下面的代码和打印:

    /li>

打印(有时!看起来像竞争条件)以下内容:
doOnComplete - before join
doOnComplete - after join
doOnComplete 2 - before join

  1. 上述代码第一次运行时,doOnComplete - after join不调用,第二次调用两次。第三次就像第一次,第四次就像第二次,等等......
    3,4 都是使用此代码运行的。可能与 subscribe {} 用法有关?请注意,我不持有一次性用品。这个流结束了,因为我 GC 'left' observable

    /li>

注意1:添加.takeUntil { thisCompleted }之后mergeAllSingles()似乎修复了#4。

注意2:在使用此方法加入多个流并应用'Note1'后,很明显onComplete(在groupJoin()调用之前!!!)将被调用的次数与'right' Observables一样多,可能意味着原因是right.takeUntil { thisCompleted },关闭“正确”流真的很重要吗?

Note3:关于 Note1,它似乎与 takeUntil 与 takeWhile 非常相关。使用 takeWhile 降低了 doOnComplete 调用,这在某种程度上是合乎逻辑的。仍在尝试更好地解决它。

  1. 除了在 groupJoin * rightObservablesCount 上运行 zip 之外,您能想到一个 multiGroupJoin,或者在我们的例子中是 multiRightGroupJoin?

请问您喜欢什么。我知道事实上我对订阅/一次性使用和手册 onComplete 的使用不是这样,我只是不确定是什么..

0 投票
1 回答
84 浏览

android - Android - 泛型类型作为 ObservableTransformer 中的表达式

我将在我的应用程序中使用可观察转换器将数据从一种类型转换为另一种类型,因此为此我创建了一个带有一个参数的泛型类,<T1>其中 T1 是我期望获得输出的类型

例子

上面的类是转换器,它以字符串的形式获取输入,并根据给定的泛型类型 T1 将 String 转换为 someData 类型
所以我将像这样调用上面的类

有没有其他解决方案可以像这种变压器一样执行?任何帮助

0 投票
1 回答
639 浏览

android - Android - 带有 Rxkotlin 过滤器的 Kotlin 密封类

由于密封就像枚举对象,所以我决定使用密封类进行网络响应,如果成功则包含成功或失败,它包含数据否则错误消息
示例

上面的 Result 类有 Success 或 Failure

还有其他解决方案吗?
任何帮助

0 投票
2 回答
65 浏览

android - RxKotlin 中的区块链并继续

我的应用程序中有一个链接操作,我有一些数据的列表,iteratable observable一旦迭代完成,我将遍历列表,我将执行一些不依赖于前一个列表的其他操作,所以我需要的是在这两个操作之间,我需要阻塞(或屏障),直到第一个操作完成并继续第二个操作。

例子

谁能帮我解决这个问题?