我使用 Spring Boot 和 WebFlux 运行基于 Netty 的 Kotlin 应用程序。详细情况如下:
- 爪哇 11;
- 科特林 1.3.61;
- Spring Boot 2.2.5.RELEASE;
- Spring Vault 核心 2.2.2.RELEASE。
我在 web 层上得到一个文件。WebFlux 从中创建一个Part
( org.springframework.http.codec.multipart
)。数据作为大小为 4Kb的块流存储在 Project ReactorFlux
中:Part
DataBuffer
Flux<DataBuffer> content();
由于符合框架的一致性,我将其转换Flux
为 Kotlin 的Flow
.
然后我在方法中使用同步 Vault 客户端encrypt(...)
异步提交块(据我所知)flatMapMerge
(注意encrypt(...)
不是suspend
,它是 HTTP 客户端顶部到远程加密提供程序的包装器):
public String encrypt(String keyName, String plaintext);
我已经检查了这个答案https://stackoverflow.com/a/58659423/6612401并发现基于流的方法应该与flow { emit(...)}
.
我的问题是我可以使用这种基于流的方法而不使用suspend
函数吗?或者有没有更好的方法,考虑到我正在使用runBlocking(Dispatchers.IO)
一个suspend
fold(...)
函数。
代码如下:
@FlowPreview
@ExperimentalCoroutinesApi
private fun getOpenByteArrayAndEncryptText(part: Part): Pair<ByteArray, String> = runBlocking(Dispatchers.IO) {
val pair = part.content().asFlow()
.flatMapMerge { dataBuffer ->
val openByteArray = dataBuffer.asInputStream().readBytes()
val opentextBase64 = Base64Utils.encodeToString(openByteArray)
flow { emit(Pair(openByteArray, vaultTransitTemplate.encrypt(KEY_NAME, opentextBase64))) }
}.fold(Pair(ByteArrayOutputStream(), StringBuilder())) { result, curPair ->
result.first.writeBytes(curPair.first)
result.second.append(curPair.second)
result
}
Pair(pair.first.toByteArray(), pair.second.toString())
}
PS 该fold(...)
函数将打开的块收集到 aByteArrayOutputStream
以稍后计算哈希,并将加密的块收集到 aStringBuilder
作为加密文件的结果。
PPS 我已经尝试过我的方法。该方法在我的 Core i5 8gen 4 物理内核机器上平均提交 5-7 个并行请求。它完成了它的工作,但没有那么快。如果 Vault 不在本地部署,我每 1 Mb 加密大约需要 1 秒。我知道这取决于网络的延迟。我什至不考虑 Vault 一侧的加密速度,由于只有 4Kb 的块大小,它快如闪电。有什么方法可以提高并发速度吗?
PPPS 我试过concurrency = MAX_CONCURRENT_REQUESTS
在flatMapMerge{...}
. 到目前为止,没有什么显着的结果。最好将其保留为默认值。