1

如何在 Reactive x 中(理想情况下使用 RxJava 或 RxJs 中的示例)可以实现这一点?

a |-a-------------------a-----------a-----------a----
s1 |-x-x-x-x-x-x -| (subscribe)
s2                       |-x-x-x-x-x-| (subscribe)
s2                                               |-x-x-x-x-x-| (subscribe)
...
sn
S |-x-x-x-x-x-x-x-------x-x-x-x-x-x-x-------------x-x-x-x-x-x- (subsribe)

a是一个无限的事件流,它触发有限sn的事件流,每个事件都应该是无限流的一部分,S同时能够订阅每个sn流(为了进行求和操作),但同时保持流S为无限。

编辑:更具体地说,我提供了我在 Kotlin 中寻找的实现。每 10 秒发出一个事件,该事件映射到 4 个事件的共享有限流。元流被flatMap编辑成正常的无限流。我利用doAfterNext额外订阅每个有限流并打印出结果。

/** Creates a finite stream with events
 * $ch-1 - $ch-4
 */
fun createFinite(ch: Char): Observable<String> =
        Observable.interval(1, TimeUnit.SECONDS)
                .take(4)
                .map({ "$ch-$it" }).share()

fun main(args: Array<String>) {

    var ch = 'A'

    Observable.interval(10, TimeUnit.SECONDS).startWith(0)
            .map { createFinite(ch++) }
            .doAfterNext {
                it
                        .count()
                        .subscribe({ c -> println("I am done. Total event count is $c") })
            }
            .flatMap { it }
            .subscribe { println("Just received [$it] from the infinite stream ") }

    // Let main thread wait forever
    CountDownLatch(1).await()
}

但是我不确定这是否是“纯 RX”方式。

4

1 回答 1

0

你不清楚你想如何进行计数。如果你在做一个总计数,那么就不需要做内部订阅:

AtomicLong counter = new AtomicLong()
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
        .map { createFinite(ch++) }
        .flatMap { it }
        .doOnNext( counter.incrementAndget() )
        .subscribe { println("Just received [$it] from the infinite stream ") }

另一方面,如果您需要为每个中间 observable 提供一个计数,那么您可以将计数移动到内部flatMap()并打印出计数并在完成时将其重置:

AtomicLong counter = new AtomicLong()
Observable.interval(10, TimeUnit.SECONDS).startWith(0)
        .map { createFinite(ch++) }
        .flatMap { it
                     .doOnNext( counter.incrementAndget()
                     .doOnCompleted( { long ctr = counter.getAndSet(0)
                                        println("I am done. Total event count is $ctr")
                                     } )
        .subscribe { println("Just received [$it] from the infinite stream ") }

这不是很实用,但是这种报告往往会破坏正常的流。

于 2017-09-04T16:41:22.047 回答