0

经过几天的努力,似乎是一个简单的任务,我来找你们:)

想法很简单。我有两个流/可观察对象,“左”和“右”。我希望“右”中的项目缓冲/收集/聚合到“左”中的“当前”项目。
因此,“左”中的每个项目都定义了一个新的“窗口”,而所有“右”项目都将绑定到该窗口,直到发出新的“左”项目。因此,可视化:

任务:
'左':|- A - - - - B - - C - - - -|
'正确' : |- 1 - 2 - 3 -4 - 5 - 6 - - -|
'结果' : |- - - - - - - -x - - -y - - - -z| ( Pair<Left, List<Right>>)
其中:A,1B,4 (所以 x) ; C (so y) 同时发出
所以:x = Pair(A, [1,2,3]), y = Pair(B, [4, 5])
并且:'right' & 'result' 完成/当 'left' 完成时终止
So: z = Pair(C, [6]) - 由于 'left' 完成而发出

----
编辑 2 - 最终解决方案!
为了将“右”项与下一个“左”而不是前一个聚合,我将代码更改为更短/更简单的代码:

fun <L, R> Observable<L>.rightGroupJoin(right: Observable<R>): Observable<Pair<L, List<R>>> {
    return this.share().run {
        zipWith(right.buffer(this), BiFunction { left, rightList ->
            Pair(left, rightList)
        })
    }
}  

编辑 1 - 初始解决方案!
取自下面@Mark(已接受)的答案,这就是我想出的。
它被分成更小的方法,因为我也可以multiRightGroupJoin()根据需要加入尽可能多的(正确的)流。

fun <T, R> Observable<T>.rightGroupJoin(right: Observable<R>): Observable<Pair<T, List<R>>> {
    return this.share().let { thisObservable ->    //use 'share' to avoid multi-subscription complications, e.g. multi calls to **preceding** doOnComplete
        thisObservable.flatMapSingle { t ->        //treat each 'left' as a Single
            bufferRightOnSingleLeft(thisObservable, t, right)
        }
    }
}

在哪里:

private fun <T, R> bufferRightOnSingleLeft(left: Observable<*>, leftSingleItem: T, right: Observable<R>)
    : Single<Pair<T, MutableList<R>>> {

    return right.buffer(left)                              //buffer 'right' until 'left' onNext() (for each 'left' Single) 
        .map { Pair(leftSingleItem, it) }
        .first(Pair(leftSingleItem, emptyList()))   //should be only 1 (list). THINK firstOrError
}  

----

到目前为止我得到了什么
经过大量阅读并了解不知何故没有开箱即用的实现,我决定使用groupJoin,主要使用这个链接,就像这样:(这里有很多问题和需要改进的地方,不要使用此代码)

private fun <T, R> Observable<T>.rightGroupJoin(right: Observable<R>): Observable<Pair<T, List<R>>> {

var thisCompleted = false //THINK is it possible to make the groupJoin complete on the left(this)'s onComplete automatically?
val thisObservable = this.doOnComplete { thisCompleted = true }
        .share() //avoid weird side-effects of multiple onSubscribe calls

//join/attach 'right/other' stream to windows (buffers), starting and ending on each 'this/left' onNext
return thisObservable.groupJoin(

    //bind 'right/other' stream to 'this/left'
    right.takeUntil { thisCompleted }//have an onComplete rule THINK add share() at the end?

    //define when windows start/end ('this/left' onNext opens new window and closes prev)
    , Function<T, ObservableSource<T>> { thisObservable }

    //define 'right/other' stream to have no windows/intervals/aggregations by itself
    // -> immediately bind each emitted item to a 'current' window(T) above
    , Function<R, ObservableSource<R>> { Observable.empty() }

    //collect the whole 'right' stream in 'current' ('left') window
    , BiFunction<T, Observable<R>, Single<Pair<T, List<R>>>> { t, rObs ->
        rObs.collect({ mutableListOf<R>() }) { acc, value ->
            acc.add(value)
        }.map { Pair(t, it.toList()) }

    }).mergeAllSingles()
}  

我还使用类似的用法来创建一个timedBuffer()- 与每个缓冲区()相同buffer(timeout)但带有时间戳,List以了解它何时开始。Observable.interval(timeout)基本上通过在(作为“左”)上运行相同的代码

问题/问题(从最简单到最难)

  1. 这是做类似事情的最佳方式吗?这不是矫枉过正吗?
  2. 当“左”完成时,是否有更好的方法(必须)来完成“结果”(和“右”)?没有这个丑陋的布尔逻辑?
  3. 这种用法似乎弄乱了 rx 的顺序。请参阅下面的代码和打印:

    leftObservable
    .doOnComplete {
        log("doOnComplete - before join")
     }
    .doOnComplete {
        log("doOnComplete 2 - before join")
     }
    .rightGroupJoin(rightObservable)
    .doOnComplete {
        log("doOnComplete - after join")
     }
    

打印(有时!看起来像竞争条件)以下内容:
doOnComplete - before join
doOnComplete - after join
doOnComplete 2 - before join

  1. 上述代码第一次运行时,doOnComplete - after join不调用,第二次调用两次。第三次就像第一次,第四次就像第二次,等等......
    3,4 都是使用此代码运行的。可能与 subscribe {} 用法有关?请注意,我不持有一次性用品。这个流结束了,因为我 GC 'left' observable

    leftObservable.subscribeOn().observeOn()
    .doOnComplete{log...}
    .rightGroupJoin()
    .doOnComplete{log...}
    .subscribe {}  
    

注意1:添加.takeUntil { thisCompleted }之后mergeAllSingles()似乎修复了#4。

注意2:在使用此方法加入多个流并应用'Note1'后,很明显onComplete(在groupJoin()调用之前!!!)将被调用的次数与'right' Observables一样多,可能意味着原因是right.takeUntil { thisCompleted },关闭“正确”流真的很重要吗?

Note3:关于 Note1,它似乎与 takeUntil 与 takeWhile 非常相关。使用 takeWhile 降低了 doOnComplete 调用,这在某种程度上是合乎逻辑的。仍在尝试更好地解决它。

  1. 除了在 groupJoin * rightObservablesCount 上运行 zip 之外,您能想到一个 multiGroupJoin,或者在我们的例子中是 multiRightGroupJoin?

请问您喜欢什么。我知道事实上我对订阅/一次性使用和手册 onComplete 的使用不是这样,我只是不确定是什么..

4

2 回答 2

1

像这样简单的东西应该可以工作:

@JvmStatic
fun main(string: Array<String>) {
    val left = PublishSubject.create<String>()
    val right = PublishSubject.create<Int>()

    left.flatMapSingle { s ->  right.buffer(left).map { Pair(s, it) }.firstOrError() }
            .subscribe{ println("Group : Letter : ${it.first}, Elements : ${it.second}") }


    left.onNext("A")
    right.onNext(1)
    right.onNext(2)
    right.onNext(3)
    left.onNext("B")
    right.onNext(4)
    right.onNext(5)
    left.onNext("C")
    right.onNext(6)
    left.onComplete()
}

输出 :

Group : Letter : A, Elements : [1, 2, 3]
Group : Letter : B, Elements : [4, 5]
Group : Letter : C, Elements : [6]

Observable感兴趣的是左边,所以订阅它。然后只需通过左 observable 的下一个发射或完成来缓冲右。您只对每个上游左发射的单个结果感兴趣,所以只需使用flatMapSingle. 我选择了firstOrError(),但显然可以有一个默认项目或其他错误处理,甚至flatMapMaybe加上firstElement()

编辑

OP 进行了进一步的问答,发现原始问题和上述解决方案用前一个左发射缓冲右值,直到下一个左发射(如上),不是必需的行为。新要求的行为是将右值缓冲到 NEXT 左发射,如下所示:

@JvmStatic
    fun main(string: Array<String>) {
        val left = PublishSubject.create<String>()
        val right = PublishSubject.create<Int>()


        left.zipWith (right.buffer(left), 
                BiFunction<String, List<Int>, Pair<String, List<Int>>> { t1, t2 -> Pair(t1, t2)
        }).subscribe { println("Group : Letter : ${it.first}, Elements : ${it.second}") }

        left.onNext("A")
        right.onNext(1)
        right.onNext(2)
        right.onNext(3)
        left.onNext("B")
        right.onNext(4)
        right.onNext(5)
        left.onNext("C")
        right.onNext(6)
        left.onComplete()
    }

这会产生不同的最终结果,因为左值与先前的右值一起压缩,直到下一个左发射(反向)。

输出 :

Group : Letter : A, Elements : []
Group : Letter : B, Elements : [1, 2, 3]
Group : Letter : C, Elements : [4, 5]
于 2019-04-17T23:32:56.090 回答
0

乍一看,我会scan在这里使用 2 s。例子:

data class Result(val left: Left?, val rightList: List<Right>) {
    companion object {
        val defaultInstance: Result = Result(null, listOf())
    }
}

leftObservable.switchMap { left -> 
    rightObservable.scan(listOf()) {list, newRight -> list.plus(newRight)}
        .map { rightsList -> Result(left, rightList) }
}
.scan(Pair(Result.defaultInstance, Result.defaultInstance)) { oldPair, newResult -> 
    Pair(oldPair.second, newResult)
}
.filter { it.first != it.second }
.map { it.first }

这里唯一的问题是处理onComplete,不知道怎么做

于 2019-04-17T18:53:50.070 回答