经过几天的努力,似乎是一个简单的任务,我来找你们:)
想法很简单。我有两个流/可观察对象,“左”和“右”。我希望“右”中的项目缓冲/收集/聚合到“左”中的“当前”项目。
因此,“左”中的每个项目都定义了一个新的“窗口”,而所有“右”项目都将绑定到该窗口,直到发出新的“左”项目。因此,可视化:
任务:
'左':|- A - - - - B - - C - - - -|
'正确' : |- 1 - 2 - 3 -4 - 5 - 6 - - -|
'结果' : |- - - - - - - -x - - -y - - - -z| ( Pair<Left, List<Right>>
)
其中:A,1;B,4 (所以 x) ; C (so y) 同时发出
所以:x = Pair(A, [1,2,3]), y = Pair(B, [4, 5])
并且:'right' & 'result' 完成/当 'left' 完成时终止
So: z = Pair(C, [6]) - 由于 'left' 完成而发出
----
编辑 2 - 最终解决方案!
为了将“右”项与下一个“左”而不是前一个聚合,我将代码更改为更短/更简单的代码:
fun <L, R> Observable<L>.rightGroupJoin(right: Observable<R>): Observable<Pair<L, List<R>>> {
return this.share().run {
zipWith(right.buffer(this), BiFunction { left, rightList ->
Pair(left, rightList)
})
}
}
编辑 1 - 初始解决方案!
取自下面@Mark(已接受)的答案,这就是我想出的。
它被分成更小的方法,因为我也可以multiRightGroupJoin()
根据需要加入尽可能多的(正确的)流。
fun <T, R> Observable<T>.rightGroupJoin(right: Observable<R>): Observable<Pair<T, List<R>>> {
return this.share().let { thisObservable -> //use 'share' to avoid multi-subscription complications, e.g. multi calls to **preceding** doOnComplete
thisObservable.flatMapSingle { t -> //treat each 'left' as a Single
bufferRightOnSingleLeft(thisObservable, t, right)
}
}
}
在哪里:
private fun <T, R> bufferRightOnSingleLeft(left: Observable<*>, leftSingleItem: T, right: Observable<R>)
: Single<Pair<T, MutableList<R>>> {
return right.buffer(left) //buffer 'right' until 'left' onNext() (for each 'left' Single)
.map { Pair(leftSingleItem, it) }
.first(Pair(leftSingleItem, emptyList())) //should be only 1 (list). THINK firstOrError
}
----
到目前为止我得到了什么
经过大量阅读并了解不知何故没有开箱即用的实现,我决定使用groupJoin
,主要使用这个链接,就像这样:(这里有很多问题和需要改进的地方,不要使用此代码)
private fun <T, R> Observable<T>.rightGroupJoin(right: Observable<R>): Observable<Pair<T, List<R>>> {
var thisCompleted = false //THINK is it possible to make the groupJoin complete on the left(this)'s onComplete automatically?
val thisObservable = this.doOnComplete { thisCompleted = true }
.share() //avoid weird side-effects of multiple onSubscribe calls
//join/attach 'right/other' stream to windows (buffers), starting and ending on each 'this/left' onNext
return thisObservable.groupJoin(
//bind 'right/other' stream to 'this/left'
right.takeUntil { thisCompleted }//have an onComplete rule THINK add share() at the end?
//define when windows start/end ('this/left' onNext opens new window and closes prev)
, Function<T, ObservableSource<T>> { thisObservable }
//define 'right/other' stream to have no windows/intervals/aggregations by itself
// -> immediately bind each emitted item to a 'current' window(T) above
, Function<R, ObservableSource<R>> { Observable.empty() }
//collect the whole 'right' stream in 'current' ('left') window
, BiFunction<T, Observable<R>, Single<Pair<T, List<R>>>> { t, rObs ->
rObs.collect({ mutableListOf<R>() }) { acc, value ->
acc.add(value)
}.map { Pair(t, it.toList()) }
}).mergeAllSingles()
}
我还使用类似的用法来创建一个timedBuffer()
- 与每个缓冲区()相同buffer(timeout)
但带有时间戳,List
以了解它何时开始。Observable.interval(timeout)
基本上通过在(作为“左”)上运行相同的代码
问题/问题(从最简单到最难)
- 这是做类似事情的最佳方式吗?这不是矫枉过正吗?
- 当“左”完成时,是否有更好的方法(必须)来完成“结果”(和“右”)?没有这个丑陋的布尔逻辑?
这种用法似乎弄乱了 rx 的顺序。请参阅下面的代码和打印:
leftObservable .doOnComplete { log("doOnComplete - before join") } .doOnComplete { log("doOnComplete 2 - before join") } .rightGroupJoin(rightObservable) .doOnComplete { log("doOnComplete - after join") }
打印(有时!看起来像竞争条件)以下内容:
doOnComplete - before join
doOnComplete - after join
doOnComplete 2 - before join
上述代码第一次运行时,
doOnComplete - after join
不调用,第二次调用两次。第三次就像第一次,第四次就像第二次,等等......
3,4 都是使用此代码运行的。可能与 subscribe {} 用法有关?请注意,我不持有一次性用品。这个流结束了,因为我 GC 'left' observableleftObservable.subscribeOn().observeOn() .doOnComplete{log...} .rightGroupJoin() .doOnComplete{log...} .subscribe {}
注意1:添加.takeUntil { thisCompleted }
之后mergeAllSingles()
似乎修复了#4。
注意2:在使用此方法加入多个流并应用'Note1'后,很明显onComplete(在groupJoin()调用之前!!!)将被调用的次数与'right' Observables一样多,可能意味着原因是right.takeUntil { thisCompleted }
,关闭“正确”流真的很重要吗?
Note3:关于 Note1,它似乎与 takeUntil 与 takeWhile 非常相关。使用 takeWhile 降低了 doOnComplete 调用,这在某种程度上是合乎逻辑的。仍在尝试更好地解决它。
- 除了在 groupJoin * rightObservablesCount 上运行 zip 之外,您能想到一个 multiGroupJoin,或者在我们的例子中是 multiRightGroupJoin?
请问您喜欢什么。我知道事实上我对订阅/一次性使用和手册 onComplete 的使用不是这样,我只是不确定是什么..