2

我花了太多时间试图解决这个问题。因此,我在下面发布的代码在下载文件方面确实有效,但问题是,流程具有非常意外的行为。方法调用似乎会阻塞,response.content.readAvailable()直到它完全完成下载整个文件,此时会发生发射进度,因此您最终要等待很长时间才能下载文件,然后在一瞬间您会获得所有进度更新。所以我想知道是否有办法做到这一点,我一次读取一定数量的字节,然后发出一个进度,然后重复直到文件完成下载?或者可能是一种挂钩 readAvailable() 方法并以这种方式更新进度的方法?对此的任何帮助将不胜感激。

这是我找到并修改的代码,但仍然无法正常工作:

suspend fun HttpClient.downloadFile(
    output: File,
    downloadUrl: String,
    md5Hash: String,
) = flow {
    try {
        val response = get<HttpResponse> { url(downloadUrl) }
        val data = ByteArray(response.contentLength()?.toInt() ?: 0)
        val contentLn = response.contentLength()?.toInt() ?: 0
        var offset = 0
        var bytesRemaining = contentLn
        do {
            val chunkSize = min(maxChunkSize, bytesRemaining)
            logger?.d { "Read Available:" }
            val result = response.content.readAvailable(data, offset, length = chunkSize)
            val progress = ((offset / contentLn.toDouble()) * 100).toInt()
            emit(DownloadResult.Progress(progress))
            logger?.d { "logged progress: $progress" }
            // delay(6000L) this was to test my assumption that the readAvalible was blocking. 
            offset += chunkSize
            bytesRemaining -= chunkSize
        } while (result != -1)

        if (response.status.isSuccess()) {
            if (data.md5().hex == md5Hash) {
                output.write(data)
                emit(DownloadResult.Success)
            } else {
                emit(DownloadResult.ErrorCorruptFile)
            }
        } else {
            emit(DownloadResult.ErrorBadResponseCode(response.status.value))
        }
    } catch (e: TimeoutCancellationException) {
        emit(DownloadResult.ErrorRequestTimeout("Connection timed out", e))
    }
}
4

1 回答 1

3

最后经过一段愚蠢的时间后,我解决了这个问题。你需要使用的是这个。这使您可以在下载时访问字节通道。

一个非常粗略的实现(我还没有完成)是这样的:

    get<HttpStatement>(url = downloadUrl).execute {
        var offset = 0
        val byteBufferSize = 1024 * 100
        val channel = it.receive<ByteReadChannel>()
        val contentLen = it.contentLength()?.toInt() ?: 0
        val data = ByteArray(contentLen)
        do {
            val currentRead = channel.readAvailable(data, offset, byteBufferSize)
            val progress = if(contentLen == 0) 0 else ( offset / contentLen.toDouble() ) * 100
            logger?.d { "progress: $progress" }
            offset += currentRead
        } while (currentRead >= 0)

    }

这个解决方案不能做两件事。1.) 我在 HttpClient 的上下文中,所以这就是我访问 get() 的方式。2.)我正在创建一个字节缓冲区大小,1024 * 100以便不让readAvailable方法阻塞太久,尽管这可能不是必需的......关于它的一个好处是它决定了你发布你的频率进度更新。

于 2020-12-03T00:28:01.177 回答