2

我正在调查在我当前的 Android 应用程序中使用 Kotlin Flow

我的应用程序通过 Retrofit API 调用从远程服务器检索数据。

其中一些 API 在 500 个项目页面中返回 50,000 个数据项。

每个 API 响应都包含一个包含下一页完整 URL 的 HTTP 链接标头。

这些调用最多可能需要 2 秒才能完成。

为了减少经过的时间,我采用了 Kotlin Flow 来同时处理每一页数据,同时还进行下一页 API 调用。

我的流程定义如下:

private val persistenceThreadPool = Executors.newFixedThreadPool(3).asCoroutineDispatcher()
private val internalWorkWorkState = MutableStateFlow<Response<List<MyPage>>?>(null)
private val workWorkState = internalWorkWorkState.asStateFlow()

private val myJob: Job

init {
    myJob = GlobalScope.launch(persistenceThreadPool) {
        workWorkState.collect { page ->
            if (page == null) {
            } else managePage(page!!)
        }
    }
}

我的递归函数定义如下,它获取所有页面:-

    private suspend fun managePages(accessToken: String, response: Response<List<MyPage>>) {
        when {
            result != null -> return
            response.isSuccessful -> internalWorkWorkState.emit(response)
            else -> {
                manageError(response.errorBody())
                result = Result.failure()
                return
            }
        }

        response.headers().filter { it.first == HTTP_HEADER_LINK && it.second.contains(REL_NEXT) }.forEach {
            val parts = it.second.split(OPEN_ANGLE, CLOSE_ANGLE)
            if (parts.size >= 2) {
                managePages(accessToken, service.myApiCall(accessToken, parts[1]))
            }
        }
    }

   private suspend fun managePage(response: Response<List<MyPage>>) {
        val pages = response.body()

        pages?.let {
            persistResponse(it)
        }
    }

    private suspend fun persistResponse(myPage: List<MyPage>) {
        val myPageDOs = ArrayList<MyPageDO>()

        myPage.forEach { page ->
            myPageDOs.add(page.mapDO())
        }

        database.myPageDAO().insertAsync(myPageDOs)
    }
    

我的许多问题是

  1. 此代码不会插入我检索到的所有数据项

  2. 检索到所有数据项后如何完成流程

  3. 检索并保存所有数据项后,如何完成 GlobalScope 作业

更新

通过进行以下更改,我设法插入了所有数据

 private val persistenceThreadPool = Executors.newFixedThreadPool(3).asCoroutineDispatcher()
    private val completed = CompletableDeferred<Int>()

    private val channel = Channel<Response<List<MyPage>>?>(UNLIMITED)
    private val channelFlow = channel.consumeAsFlow().flowOn(persistenceThreadPool)

    private val frank: Job

    init {
        frank = GlobalScope.launch(persistenceThreadPool) {
            channelFlow.collect { page ->
                if (page == null) {
                    completed.complete(totalItems)
                } else managePage(page!!)
            }
        }
    }


...
...
...

   channel.send(null)
   completed.await()

   return result ?: Result.success(outputData)

我不喜欢依赖 a CompletableDeferred,有没有比这更好的方法来知道 Flow 什么时候完成了所有事情?

4

2 回答 2

2

您正在寻找流程构建器Flow.buffer()

suspend fun getData(): Flow<Data> = flow {
    var pageData: List<Data>
    var pageUrl: String? = "bla"
    while (pageUrl != null) {
        TODO("fetch pageData from pageUrl and change pageUrl to the next page")
        emitAll(pageData)
    }
}
    .flowOn(Dispatchers.IO /* no need for a thread pool executor, IO does it automatically */)
    .buffer(3)

你可以像普通的流、迭代等一样使用它。如果你想知道输出的总长度,你应该用一个可变的闭包变量在消费者上计算它。请注意,您不需要在任何地方(理想情况下)使用 GlobalScope。

于 2021-01-02T19:00:22.520 回答
0

有几种方法可以实现所需的行为。我建议使用专为并行分解设计的coroutineScope 。它还提供了开箱即用的良好取消和错误处理行为。结合Channel.close 行为,它使实现变得非常简单。从概念上讲,实现可能如下所示:

 suspend fun fetchAllPages() {
    coroutineScope {
        val channel = Channel<MyPage>(Channel.UNLIMITED)
        launch(Dispatchers.IO){ loadData(channel) }
        launch(Dispatchers.IO){ processData(channel) }
    }
}

suspend fun loadData(sendChannel: SendChannel<MyPage>){
    while(hasMoreData()){
        sendChannel.send(loadPage())
    }
    sendChannel.close()
}

suspend fun processData(channel: ReceiveChannel<MyPage>){
    for(page in channel){
        // process page
    }
}

它的工作方式如下:

  1. coroutineScope暂停,直到所有孩子完成。所以你不再需要CompletableDeferred了。
  2. loadData()循环加载页面并将其发布到频道中。加载完所有页面后,它会立即关闭频道。
  3. processData从通道中一一获取项目并处理它们。一旦处理完所有项目(并且通道已关闭),循环将立即结束。

在这个实现中,生产者协程独立工作,没有背压,因此如果处理速度很慢,它可能会占用大量内存。限制缓冲区容量以在缓冲区已满时让生产者协程挂起。使用通道扇出行为来启动多个处理器以加快计算速度也是一个好主意。

于 2021-01-01T13:11:13.417 回答