3

使用Rx一个可以合并多个订阅源,如下所示

// psudo data repository

fun getAllData(): Flowable<DataType> {
    return getCachedData().mergeWith(getRemoteData())
}

fun getCachedData(): Flowable<DataType> {
    // local database call
}

fun getRemoteData(): Flowable<DataType> {
    // network call
}

在上面的代码中,getAllData()一旦合并Flowables返回之一,将立即返回数据,然后在准备好后发送另一个。

问题是,如何使用 Kotlin 协程实现相同的结果produce

4

1 回答 1

5

您可以创建一个组合通道,produce在其中启动两个使用两个输入通道的协程并将其重新发送到组合通道。

这是一个将多个相同类型的接收通道合并为一个的功能。

/**
 * Merges multiple [channels] into one channel.
 * All elements of all channels are sent to the combined channel in the order they arrive on the input channels.
 */
fun <T> CoroutineScope.mergeChannels(vararg channels: ReceiveChannel<T>) : ReceiveChannel<T> {
    return produce {
        channels.forEach {
            launch { it.consumeEach { send(it) }}
        }
    }
}

你可以像这样使用它:

fun main() = runBlocking<Unit> {
    val every100Ms = produce {
        repeat(10) {
            send("every 100: $it")
            delay(100)
        }
    }

    val every200Ms = produce {
        repeat(10) {
            send("every 200: $it")
            delay(200)
        }
    }

    val combined = mergeChannels(every100Ms, every200Ms)
    combined.consumeEach { println(it) }
}
于 2019-06-13T16:04:06.557 回答