我需要.combineLatest()
在 a 上实现以下扩展功能ReceiveChannel
suspend fun <A, B, R> ReceiveChannel<A>.combineLatest(
otherSource: ReceiveChannel<B>,
context: CoroutineContext = Unconfined,
combineFunction: suspend (A, B) -> R
): ReceiveChannel<R> = produce(context) {
// ?
}
我希望它像 RxJava 的combineLatest()
.
我怎样才能做到这一点?
编辑:到目前为止我有这个,但它不工作。该sourceB.consumeEach{ }
块永远不会被执行。
suspend fun <A, B, R> ReceiveChannel<A>.combineLatest(
otherSource: ReceiveChannel<B>,
context: CoroutineContext = Unconfined,
combineFunction: suspend (A, B) -> R
): ReceiveChannel<R> = produce(context) {
val sourceA: ReceiveChannel<A> = this@combineLatest
val sourceB: ReceiveChannel<B> = otherSource
var latestA: A? = null
var latestB: B? = null
sourceA.consumeEach { a ->
latestA = a
if (latestA != null && latestB != null) {
send(combineFunction(latestA!!, latestB!!))
}
}
sourceB.consumeEach { b ->
latestB = b
if (latestA != null && latestB != null) {
send(combineFunction(latestA!!, latestB!!))
}
}
}
我还想确保当ReceiveChannel<R>
此函数返回的关闭(取消订阅)时,我想确保父频道正确关闭。