我怎样才能转换io.ktor.utils.io.ByteReadChannel
成kotlinx.coroutines.flow.Flow<java.nio.ByteBuffer>
?
我将Ktor与此路由一起使用:
post("/upload") {
val channel: ByteReadChannel = call.receiveChannel()
val flow: Flow<ByteBuffer> = channel.asByteBufferFlow() // my custom extension method
transaction.execute {
testDao.saveFile(flow)
}
call.respond("OK")
}
DAO 像这样使用R2DBC和Blob:
override suspend fun saveFile(input: Flow<ByteBuffer>) {
val connection = requireR2DBCTransactionConnection()
val publisher: Publisher<ByteBuffer> = input.asPublisher()
val statement: Statement = connection.createStatement("insert into bindata (data) values ($1)")
statement.bind(0, Blob.from(publisher))
val count: Int = statement.execute().awaitFirst().rowsUpdated.awaitFirst()
if (count != 1) {
throw IllegalStateException()
}
}
我尝试编写此扩展方法,但失败了:
fun ByteReadChannel.asByteBufferFlow(): Flow<ByteBuffer> = object : AbstractFlow<ByteBuffer>() {
override suspend fun collectSafely(collector: FlowCollector<ByteBuffer>) {
/* I have no idea */
}
}
我的主要问题是我没有找到任何类似的样本,而且两者ByteBuffer
对ByteReadChannel
我来说都是新的。