1

我怎样才能转换io.ktor.utils.io.ByteReadChannelkotlinx.coroutines.flow.Flow<java.nio.ByteBuffer>

我将Ktor与此路由一起使用:

        post("/upload") {
            val channel: ByteReadChannel = call.receiveChannel()
            val flow: Flow<ByteBuffer> = channel.asByteBufferFlow() // my custom extension method
            transaction.execute {
                testDao.saveFile(flow)
            }
            call.respond("OK")
        }

DAO 像这样使用R2DBCBlob

    override suspend fun saveFile(input: Flow<ByteBuffer>) {
        val connection = requireR2DBCTransactionConnection()
        val publisher: Publisher<ByteBuffer> = input.asPublisher()
        val statement: Statement = connection.createStatement("insert into bindata (data) values ($1)")
        statement.bind(0, Blob.from(publisher))
        val count: Int = statement.execute().awaitFirst().rowsUpdated.awaitFirst()
        if (count != 1) {
            throw IllegalStateException()
        }
    }

我尝试编写此扩展方法,但失败了:

fun ByteReadChannel.asByteBufferFlow(): Flow<ByteBuffer> = object : AbstractFlow<ByteBuffer>() {

    override suspend fun collectSafely(collector: FlowCollector<ByteBuffer>) {
        /* I have no idea */
    }

}

我的主要问题是我没有找到任何类似的样本,而且两者ByteBufferByteReadChannel我来说都是新的。

4

0 回答 0