我正在调查在我当前的 Android 应用程序中使用 Kotlin Flow
我的应用程序通过 Retrofit API 调用从远程服务器检索数据。
其中一些 API 在 500 个项目页面中返回 50,000 个数据项。
每个 API 响应都包含一个包含下一页完整 URL 的 HTTP 链接标头。
这些调用最多可能需要 2 秒才能完成。
为了减少经过的时间,我采用了 Kotlin Flow 来同时处理每一页数据,同时还进行下一页 API 调用。
我的流程定义如下:
private val persistenceThreadPool = Executors.newFixedThreadPool(3).asCoroutineDispatcher()
private val internalWorkWorkState = MutableStateFlow<Response<List<MyPage>>?>(null)
private val workWorkState = internalWorkWorkState.asStateFlow()
private val myJob: Job
init {
myJob = GlobalScope.launch(persistenceThreadPool) {
workWorkState.collect { page ->
if (page == null) {
} else managePage(page!!)
}
}
}
我的递归函数定义如下,它获取所有页面:-
private suspend fun managePages(accessToken: String, response: Response<List<MyPage>>) {
when {
result != null -> return
response.isSuccessful -> internalWorkWorkState.emit(response)
else -> {
manageError(response.errorBody())
result = Result.failure()
return
}
}
response.headers().filter { it.first == HTTP_HEADER_LINK && it.second.contains(REL_NEXT) }.forEach {
val parts = it.second.split(OPEN_ANGLE, CLOSE_ANGLE)
if (parts.size >= 2) {
managePages(accessToken, service.myApiCall(accessToken, parts[1]))
}
}
}
private suspend fun managePage(response: Response<List<MyPage>>) {
val pages = response.body()
pages?.let {
persistResponse(it)
}
}
private suspend fun persistResponse(myPage: List<MyPage>) {
val myPageDOs = ArrayList<MyPageDO>()
myPage.forEach { page ->
myPageDOs.add(page.mapDO())
}
database.myPageDAO().insertAsync(myPageDOs)
}
我的许多问题是
此代码不会插入我检索到的所有数据项
检索到所有数据项后如何完成流程
检索并保存所有数据项后,如何完成 GlobalScope 作业
更新
通过进行以下更改,我设法插入了所有数据
private val persistenceThreadPool = Executors.newFixedThreadPool(3).asCoroutineDispatcher()
private val completed = CompletableDeferred<Int>()
private val channel = Channel<Response<List<MyPage>>?>(UNLIMITED)
private val channelFlow = channel.consumeAsFlow().flowOn(persistenceThreadPool)
private val frank: Job
init {
frank = GlobalScope.launch(persistenceThreadPool) {
channelFlow.collect { page ->
if (page == null) {
completed.complete(totalItems)
} else managePage(page!!)
}
}
}
...
...
...
channel.send(null)
completed.await()
return result ?: Result.success(outputData)
我不喜欢依赖 a CompletableDeferred
,有没有比这更好的方法来知道 Flow 什么时候完成了所有事情?