1

我需要.combineLatest()在 a 上实现以下扩展功能ReceiveChannel

suspend fun <A, B, R> ReceiveChannel<A>.combineLatest(
    otherSource: ReceiveChannel<B>,
    context: CoroutineContext = Unconfined,
    combineFunction: suspend (A, B) -> R
): ReceiveChannel<R> = produce(context) {
    // ?
}

我希望它像 RxJava 的combineLatest().

我怎样才能做到这一点?

编辑:到目前为止我有这个,但它不工作。该sourceB.consumeEach{ }块永远不会被执行。

suspend fun <A, B, R> ReceiveChannel<A>.combineLatest(
    otherSource: ReceiveChannel<B>,
    context: CoroutineContext = Unconfined,
    combineFunction: suspend (A, B) -> R
): ReceiveChannel<R> = produce(context) {

    val sourceA: ReceiveChannel<A> = this@combineLatest
    val sourceB: ReceiveChannel<B> = otherSource

    var latestA: A? = null
    var latestB: B? = null

    sourceA.consumeEach { a ->
        latestA = a
        if (latestA != null && latestB != null) {
            send(combineFunction(latestA!!, latestB!!))
        }
    }

    sourceB.consumeEach { b ->
        latestB = b
        if (latestA != null && latestB != null) {
            send(combineFunction(latestA!!, latestB!!))
        }
    }
}

我还想确保当ReceiveChannel<R>此函数返回的关闭(取消订阅)时,我想确保父频道正确关闭。

4

2 回答 2

0

我知道这是一个老问题,但这里有一个建议:

我会推荐使用.zip()而不是嵌套.consumeEach在此处查看文档。

sourceA.zip(sourceB).consumeEach{}产生 Pair 类型项目的可能解决方案。

于 2018-09-05T11:14:55.743 回答
0

这成功了!我仍然很困惑为什么我可以将一个嵌套.consumeEach{ }在另一个内部.consumeEach { }- 这似乎不直观。

suspend fun <A, B, R> ReceiveChannel<A>.combineLatest(
    otherSource: ReceiveChannel<B>,
    context: CoroutineContext = Unconfined,
    combineFunction: suspend (A, B) -> R
): ReceiveChannel<R> = produce(context) {

    val sourceA: ReceiveChannel<A> = this@combineLatest
    val sourceB: ReceiveChannel<B> = otherSource

    val latestA = AtomicReference<A>()
    val latestB = AtomicReference<B>()

    var aInitialized = false
    var bInitialized = false

    sourceA.consumeEach { a ->
        latestA.set(a)
        aInitialized = true
        if (aInitialized && bInitialized) {
            send(combineFunction(latestA.get(), latestB.get()))
        }

        launch(coroutineContext) {
            sourceB.consumeEach { b ->
                latestB.set(b)
                bInitialized = true
                if (aInitialized && bInitialized) {
                    send(combineFunction(latestA.get(), latestB.get()))
                }
            }
        }
    }
}
于 2018-03-11T07:31:37.753 回答