我正在学习 kotlin 协程和流程,但有一件事对我来说有点晦涩难懂。如果我对常规协程有一个长时间运行的循环,我可以使用 isActive 或 ensureActive 来处理取消。但是,这些不是为流程定义的,但是以下代码正确地完成了流程:
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory
private val logger = LoggerFactory.getLogger("Main")
fun main() {
val producer = FlowProducer()
runBlocking {
producer
.produce()
.take(10)
.collect {
logger.info("Received $it")
}
}
logger.info("done")
}
class FlowProducer {
fun produce() = flow {
try {
var counter = 1
while (true) {
logger.info("Before emit")
emit(counter++)
logger.info("After emit")
}
}finally {
logger.info("Producer has finished")
}
}.flowOn(Dispatchers.IO)
}
为什么会有这种情况?是因为发射是一个可以为我处理取消的可挂起函数吗?如果有条件地调用了发射怎么办?例如,该循环实际上从 Kafka 轮询记录,并且仅在接收到的记录不为空时才调用发出。那么我们可以有这样的情况:
- 我们想要 10 条消息(取 10 条)
- 实际上,关于 kafka 主题的消息只有 10 条
- 由于没有更多消息,emit 不会再次被调用,因此即使我们收到了我们想要的所有消息,循环将继续在不必要的轮询上浪费资源。
不知道我的理解是否正确。在这种情况下,我应该在每个循环上调用 yield() 吗?