如何在 Reactive x 中(理想情况下使用 RxJava 或 RxJs 中的示例)可以实现这一点?
a |-a-------------------a-----------a-----------a----
s1 |-x-x-x-x-x-x -| (subscribe)
s2 |-x-x-x-x-x-| (subscribe)
s2 |-x-x-x-x-x-| (subscribe)
...
sn
S |-x-x-x-x-x-x-x-------x-x-x-x-x-x-x-------------x-x-x-x-x-x- (subsribe)
a
是一个无限的事件流,它触发有限sn
的事件流,每个事件都应该是无限流的一部分,S
同时能够订阅每个sn
流(为了进行求和操作),但同时保持流S
为无限。
编辑:更具体地说,我提供了我在 Kotlin 中寻找的实现。每 10 秒发出一个事件,该事件映射到 4 个事件的共享有限流。元流被flatMap
编辑成正常的无限流。我利用doAfterNext
额外订阅每个有限流并打印出结果。
/** Creates a finite stream with events
* $ch-1 - $ch-4
*/
fun createFinite(ch: Char): Observable<String> =
Observable.interval(1, TimeUnit.SECONDS)
.take(4)
.map({ "$ch-$it" }).share()
fun main(args: Array<String>) {
var ch = 'A'
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
.map { createFinite(ch++) }
.doAfterNext {
it
.count()
.subscribe({ c -> println("I am done. Total event count is $c") })
}
.flatMap { it }
.subscribe { println("Just received [$it] from the infinite stream ") }
// Let main thread wait forever
CountDownLatch(1).await()
}
但是我不确定这是否是“纯 RX”方式。