1

我正在尝试创建一个在超时后发出值的流,而不取消底层协程。这个想法是网络调用有 X 时间来完成并发出一个值,并且在达到超时之后,发出一些初始值而不取消底层工作(最终从网络调用发出值,假设它成功)。

像这样的东西似乎可以工作,但是当达到超时时它会取消底层协程。它也不处理在超时时发出一些默认值。

val someFlow = MutableStateFlow("someInitialValue")

val deferred = async {
    val networkCallValue = someNetworkCall()
    someFlow.emit(networkCallValue)
}

withTimeout(SOME_NUMBER_MILLIS) {
    deferred.await()
}

我希望能够在任何时候发出网络调用返回的值,如果达到超时,只需发出一些默认值。我将如何使用 Flow/Coroutines 完成此任务?

4

4 回答 4

1

一种方法是使用一个简单的select子句:

import kotlinx.coroutines.selects.*

val someFlow = MutableStateFlow("someInitialValue")

val deferred = async {
    someFlow.value = someNetworkCall()
}

// await the first of the 2 things, without cancelling anything
select<Unit> {
    deferred.onAwait {}
    onTimeout(SOME_NUMBER_MILLIS) {
        someFlow.value = someDefaultValue
    }
}

但是,如果它在多线程调度程序上运行,您将不得不注意竞争条件。如果异步在超时后完成,则默认值有可能覆盖网络响应。

防止这种情况的一种方法是,如果您知道网络不能返回与初始值相同的值(并且如果没有其他协程正在更改状态),则使用原子更新方法:

val deferred = async {
    val networkCallValue = someNetworkCall()
    someFlow.update { networkCallValue }
}

// await the first of the 2 things, without cancelling anything
val initialValue = someFlow.value
select<Unit> {
    deferred.onAwait {}
    onTimeout(300) {
        someFlow.update { current ->
            if (current == initialValue) {
                "someDefaultValue"
            } else {
                current // don't overwrite the network result
            }
        }
    }
}

如果您不能依赖状态比较,则可以使用 aMutex和布尔值保护对流的访问:

val someFlow = MutableStateFlow("someInitialValue")
val mutex = Mutex()
var networkCallDone = false

val deferred = async {
    val networkCallValue = someNetworkCall()
    mutex.withLock {
        someFlow.value = networkCallValue
        networkCallDone = true
    }
}

// await the first of the 2 things, without cancelling anything
select<Unit> {
    deferred.onAwait {}
    onTimeout(300) {
        mutex.withLock {
            if (!networkCallDone) {
                someFlow.value = "someDefaultValue"
            }
        }
    }
}
于 2022-01-12T18:14:13.303 回答
1

解决竞争条件的最简单方法可能是使用@Joffrey 的回答中的select() 。select()保证只执行一个分支。

但是,我认为同时改变共享流会使情况复杂化,并引入另一个我们需要解决的竞争条件。相反,我们可以非常轻松地做到这一点:

flow {
    val network = async { someNetworkCall() }
    select {
        network.onAwait{ emit(it) }
        onTimeout(1000) {
            emit("initial")
            emit(network.await())
        }
    }
}

没有竞争条件需要处理。我们只有两个简单的执行分支,这取决于首先发生了什么。

如果我们需要 aStateFlow那么我们可以使用stateIn()来转换常规流。或者我们可以在问题中使用MutableStateFlowas ,但只在内部对其进行变异select(),类似于上面:

select {
    network.onAwait{ someFlow.value = it }
    onTimeout(1000) {
        someFlow.value = "initial"
        someFlow.value = network.await()
    }
}
于 2022-01-12T21:56:32.163 回答
1

您可以同时启动两个协程,并在第二个协程中取消Job第一个协程,该协程负责发出默认值:

val someFlow = MutableStateFlow("someInitialValue")

val firstJob = launch {
    delay(SOME_NUMBER_MILLIS)
    ensureActive() // Ensures that current Job is active.
    someFlow.update {"DefaultValue"}
}
launch {
    val networkCallValue = someNetworkCall()
    firstJob.cancelAndJoin()
    someFlow.update { networkCallValue }
}
于 2022-01-12T20:23:50.920 回答
0

您可以同时发送网络请求和启动超时延迟。当网络调用成功时,使用响应更新 StateFlow。并且,当超时结束并且我们还没有收到响应时,使用默认值更新 StateFlow。

val someFlow = MutableStateFlow(initialValue)

suspend fun getData() {
    launch {
        someFlow.value = someNetworkCall()
    }
    delay(TIMEOUT_MILLIS)
    if(someFlow.value == initialValue)
        someFlow.value = defaultValue
}

如果网络调用的响应可以和 一样initialValue,你可以新建一个Boolean来检查网络请求的完成情况。另一种选择是存储Job返回的引用并在超时后launch检查是否。job.isActive

编辑:如果您想delay在网络请求完成时取消,您可以执行以下操作:

val someFlow = MutableStateFlow(initialValue)

suspend fun getData() {
    val job = launch {    
        delay(TIMEOUT_MILLIS)
        someFlow.value = defaultValue
    } 
    someFlow.value = someNetworkCall()
    job.cancel()
}

为了解决可能的并发问题,您可以使用 MutableStateFlow。update原子更新。

于 2022-01-12T17:54:02.990 回答