我正在尝试仅使用 kotlin 实施退避策略flow
。
我需要从 timeA 到 timeB 获取数据
result = dataBetween(timeA - timeB)
如果结果为空,那么我想使用指数退避来增加结束时间窗口
result = dataBetween(timeA - timeB + exponentialBackOffInDays)
我正在关注这篇文章,该文章解释了如何在rxjava2
.
但是卡在还flow
没有takeUntil
操作员的地方。
你可以在下面看到我的实现。
fun main() {
runBlocking {
(0..8).asFlow()
.flatMapConcat { input ->
// To simulate a data source which fetches data based on a time-window start-date to end-date
// available with in that time frame.
flow {
println("Input: $input")
if (input < 5) {
emit(emptyList<String>())
} else { // After emitting this once the flow should complete
emit(listOf("Available"))
}
}.retryWhenThrow(DummyException(), predicate = {
it.isNotEmpty()
})
}.collect {
//println(it)
}
}
}
class DummyException : Exception("Collected size is empty")
private inline fun <T> Flow<T>.retryWhenThrow(
throwable: Throwable,
crossinline predicate: suspend (T) -> Boolean
): Flow<T> {
return flow {
collect { value ->
if (!predicate(value)) {
throw throwable // informing the upstream to keep emitting since the condition is met
}
println("Value: $value")
emit(value)
}
}.catch { e ->
if (e::class != throwable::class) throw e
}
}
它工作正常,除非流具有成功的值,流继续收集直到8
从上游流,但理想情况下,它应该在到达5
自身时停止。
关于我应该如何解决这个问题的任何帮助都会有所帮助。