9

我有以下代码:

val channel = BroadcastChannel<Event>(10)

fun setup() {
    scope.launch {
        channel.asFlow().
            .flatMapLatest { fetchSomeData() }
            .catch { emit(DefaultData()) }
            .onEach { handleData() }
            .collect()

    }
}

fun load() {
    channel.offer(Event.Load)      
}

如果fetchSomeData因异常而失败,它将被捕获catch并传递一些默认数据。问题是流程本身被取消并且正在从频道的订阅者中删除。这意味着提供给频道的任何新事件都将被忽略,因为不再有任何订阅者。

有没有办法确保在发生异常时流程不会被取消?

4

3 回答 3

2

您应该捕获 fetchSomeData() 的异常,因此catch从主要流程转到 fetchSomeData():

    scope.launch {
        channel.asFlow().
            .flatMapLatest { fetchSomeData().catch { emit(DefaultData()} }
            .onEach { handleData() }
            .collect()

    }
于 2020-09-21T09:08:05.603 回答
-1

我遇到了同样的问题。我的解决方法是这样的:

/* Custom onEach extension function */
fun <T> Flow<T>.onEachCatching(block: suspend (T) -> Unit) = OnEachCatching(this, block)

class OnEachCatching<T>(private val src: Flow<T>, private val block: suspend (T) -> Unit, bufferCapacity: Int = Channel.CONFLATED) {

    private val okValue = Channel<T>(bufferCapacity)

    private var failBlock: (suspend (Throwable) -> Unit)? = null

    init {
        GlobalScope.launch {
            src.collect { value ->
                runCatching { block(value) }
                    .onFailure { failBlock?.invoke(it) }
                    .onSuccess { okValue.send(value) }
            }

            okValue.close()
        }
    }

    fun onFailure(block: suspend (Throwable) -> Unit) = this.also {
        failBlock = block
    }

    fun resumeFlow() = okValue.consumeAsFlow()
}

用法:

someData
    .onEachCatching { handleData() }
    .onFailure { emit(DefaultData()) }
    .resumeFlow()
    .collect()
于 2020-09-14T14:14:02.927 回答
-1

我的设计是简单地重新启动收集流程

private fun startParsingMessages() {
    coroutineScope?.launch {
        sessionController.subscribeToMessages()
            .onEach { /*code block*/ }
            .catch {
                it.cause
                    ?.let { error -> Timber.e(error) }
                    ?: Timber.e("startSession(): ${it.message}")

                startParsingMessages() //here
            }
            .collect()
    }
}
于 2021-01-02T20:18:05.877 回答