0

我有使用callbackFlow这样的协程代码:

fun getUniqueEventAsFlow(receiverId: String): Flow<Any> = callbackFlow {
    RxEventBus().register(
        receiverId,
        FirstUniqueEvent::class.java,
        false
    ) { amEvent ->
        offer(amEvent)
    }
    // Suspend until either onCompleted or external cancellation are invoked
    awaitClose {
        unsubscribeFromEventBus(receiverId)
        cancel()
    }
}.flowOn(Dispatchers.Default)
    .catch { throwable ->
        reportError(throwable as Exception)
    }

我想做的是包装以下内容,以便可以自动调用它,因为我在代码中有许多类似的功能:

        // Suspend until either onCompleted or external cancellation are invoked
        awaitClose {
            unsubscribeFromEventBus(receiverId)
            cancel()
        }
    }.flowOn(Dispatchers.Default)
        .catch { throwable ->
            reportError(throwable as Exception)
        }

我想包装一次awaitCloseflowOn,而不必为每个callbackFlow编写它。你知道我可以使用哪个 Kotlin 高阶构造来实现这一点吗?

谢谢你,伊戈尔

4

1 回答 1

0

这是包装awaitClose和的解决方案handleErrors

/**
 * Utility function which suspends a coroutine until it is completed or closed.
 * Unsubscribes from Rx Bus event and ensures that the scope is cancelled upon completion.
 */
suspend fun finalizeFlow(scope: ProducerScope<Any>, receiverId: String) {
    scope.awaitClose {
        unsubscribeFromEventBus(receiverId)
        scope.cancel()
    }

    scope.invokeOnClose {
        Logger.debug(
            javaClass.canonicalName,
            "Closed Flow channel for receiverId $receiverId"
        )
    }
}

 /**
  * Extension function which does error handling on [Flow].
  */
 fun <T> Flow<T>.handleErrors(): Flow<T> = catch { throwable ->
     reportError(throwable as Exception)
 }
于 2020-08-13T20:18:42.817 回答