据我所知,没有内置的方法可以做到这一点,但你可以做一个:
fun <T> Flow<T>.repeated(times: Int): Flow<T> {
val map = mutableMapOf<T, Int>()
return transform { value ->
val count = (map[value] ?: 0) + 1
map[value] = count
if (count >= times) {
return@transform emit(value)
}
}
}
这使用地图来跟踪每个值已发出多少次。然后,如果一个值被发出足够多的次数,它就会通过转换被发出。
channel.asFlow()
.repeated(3)
.collect {
// Do something.
}
编辑:它绝对有效。这是测试代码:
suspend fun test() = coroutineScope {
val flow = MutableSharedFlow<String>()
launch {
flow.repeated(3).collect { value ->
println("received: $value")
}
}
launch {
while (true) {
val value = listOf("a", "b", "c").random()
println("emitted: $value")
flow.emit(value)
delay(1000)
}
}
}
我得到以下输出:
emitted: c
emitted: a
emitted: c
emitted: c
received: c
emitted: b
emitted: b
emitted: b
received: b
emitted: a
emitted: a
received: a
emitted: a
received: a
emitted: b
received: b