有没有一种简单的方法来实现这样的事情
取决于你的直截了当。这是我将如何做到的。
背压转化为协程世界中的程序化暂停和恢复。对于onBackpressureDrop
,下游必须指示它已准备好接受一项并暂停它,而上游不应等待下游准备好。
您必须以无限制的方式消耗上游并将项目和终端事件移交给下游等待这些信号。
package hu.akarnokd.kotlin.flow.impl
import hu.akarnokd.kotlin.flow.Resumable
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.collect
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
@FlowPreview
internal class FlowOnBackpressureDrop<T>(private val source: Flow<T>)
: AbstractFlow<T>() {
@ExperimentalCoroutinesApi
@InternalCoroutinesApi
override suspend fun collectSafely(collector: FlowCollector<T>) {
coroutineScope {
val consumerReady = AtomicBoolean()
val producerReady = Resumable()
val value = AtomicReference<T>()
val done = AtomicBoolean()
val error = AtomicReference<Throwable>();
launch {
try {
source.collect {
if (consumerReady.get()) {
value.set(it);
consumerReady.set(false);
producerReady.resume();
}
}
done.set(true)
} catch (ex: Throwable) {
error.set(ex)
}
producerReady.resume()
}
while (true) {
consumerReady.set(true)
producerReady.await()
val d = done.get()
val ex = error.get()
val v = value.getAndSet(null)
if (ex != null) {
throw ex;
}
if (d) {
break;
}
collector.emit(v)
}
}
}
}
注意:可恢复的实施。
所以让我们来看看实现。
首先,需要 5 个变量在上游的收集器和为下游工作的收集器之间传递信息: -consumerReady
表示下游已准备好下一个项目, -producerReady
表示生产者已存储下一个项目(或终端信号)和下游可以恢复 -value
上游项目准备消费 -done
上游已结束 -error
上游已失败
接下来,我们必须为上游启动收集器,因为收集正在暂停,并且在完成之前不会让下游消费者循环运行。在这个收集器中,我们检查下游消费者是否准备好(通过consumerReady
),如果是,则存储当前项目,清除准备就绪标志并通过 指示其可用性producerReady
。清除consumerReady
将阻止后续上游项目被存储,直到下游本身指示新的准备就绪。
当上游结束或崩溃时,我们设置done
orerror
变量并指示生产者已发言。
在这launch { }
部分之后,我们现在将代表下游收集器继续使用共享变量。
每一轮的第一件事是表明我们已准备好接受下一个值,然后等待生产者方发出信号,它已将下一个事件放入共享变量中。
接下来,我们从这些变量中收集值。我们急于完成或抛出错误,并且仅作为最后的手段将上游项目重新发送到下游收集器。