0

我正在尝试仅使用 kotlin 实施退避策略flow

我需要从 timeA 到 timeB 获取数据

result = dataBetween(timeA - timeB)

如果结果为空,那么我想使用指数退避来增加结束时间窗口

result = dataBetween(timeA - timeB + exponentialBackOffInDays)

我正在关注这篇文章,该文章解释了如何在rxjava2.

但是卡在flow没有takeUntil操作员的地方。

你可以在下面看到我的实现。

fun main() {
    runBlocking {
        (0..8).asFlow()
            .flatMapConcat { input ->
                // To simulate a data source which fetches data based on a time-window start-date to end-date 
                // available with in that time frame.
                flow {
                    println("Input: $input")
                    if (input < 5) {
                        emit(emptyList<String>())
                    } else { // After emitting this once the flow should complete
                        emit(listOf("Available"))
                    }
                }.retryWhenThrow(DummyException(), predicate = {
                    it.isNotEmpty()
                })
            }.collect {
                //println(it)
            }
    }
}

class DummyException : Exception("Collected size is empty")

private inline fun <T> Flow<T>.retryWhenThrow(
    throwable: Throwable,
    crossinline predicate: suspend (T) -> Boolean
): Flow<T> {
    return flow {
        collect { value ->
            if (!predicate(value)) {
                throw throwable // informing the upstream to keep emitting since the condition is met
            }
            println("Value: $value")
            emit(value)
        }
    }.catch { e ->
        if (e::class != throwable::class) throw e
    }
}


它工作正常,除非流具有成功的值,流继续收集直到8从上游流,但理想情况下,它应该在到达5自身时停止。

关于我应该如何解决这个问题的任何帮助都会有所帮助。

4

1 回答 1

0

也许这与您的确切设置不匹配,但您collect最好不要调用,而是使用first{...}orfirstOrNull{...} 这将在找到元素后自动停止上游流。
例如:

flowOf(0,0,3,10)
    .flatMapConcat {
        println("creating list with $it elements")
        flow {
            val listWithElementCount = MutableList(it){ "" }  // just a list of n empty strings
            emit(listWithElementCount)
        }
    }.first { it.isNotEmpty() }

附带说明一下,您的问题听起来像常规的挂起功能会更合适。就像是

suspend fun getFirstNonEmptyList(initialFrom: Long, initialTo: Long): List<Any> {
    var from = initialFrom
    var to = initialTo
    while (coroutineContext.isActive) {
        val elements = getElementsInRange(from, to) // your  "dataBetween"
        if (elements.isNotEmpty()) return elements
        val (newFrom, newTo) = nextBackoff(from, to)
        from = newFrom
        to = newTo
    }
    throw CancellationException()
}
于 2020-07-21T11:28:56.573 回答