0

我使用 Spring Boot 和 WebFlux 运行基于 Netty 的 Kotlin 应用程序。详细情况如下:

  • 爪哇 11;
  • 科特林 1.3.61;
  • Spring Boot 2.2.5.RELEASE;
  • Spring Vault 核心 2.2.2.RELEASE。

我在 web 层上得到一个文件。WebFlux 从中创建一个Part( org.springframework.http.codec.multipart)。数据作为大小为 4Kb的块流存储在 Project ReactorFlux中:PartDataBuffer

Flux<DataBuffer> content();

由于符合框架的一致性,我将其转换Flux为 Kotlin 的Flow.

然后我在方法中使用同步 Vault 客户端encrypt(...)异步提交块(据我所知)flatMapMerge(注意encrypt(...)不是suspend,它是 HTTP 客户端顶部到远程加密提供程序的包装器):

public String encrypt(String keyName, String plaintext);

我已经检查了这个答案https://stackoverflow.com/a/58659423/6612401并发现基于流的方法应该与flow { emit(...)}.

我的问题是我可以使用这种基于流的方法而不使用suspend函数吗?或者有没有更好的方法,考虑到我正在使用runBlocking(Dispatchers.IO)一个suspend fold(...)函数。

代码如下:

@FlowPreview
@ExperimentalCoroutinesApi
private fun getOpenByteArrayAndEncryptText(part: Part): Pair<ByteArray, String> = runBlocking(Dispatchers.IO) {
    val pair = part.content().asFlow()
            .flatMapMerge { dataBuffer ->
                val openByteArray = dataBuffer.asInputStream().readBytes()
                val opentextBase64 = Base64Utils.encodeToString(openByteArray)
                flow { emit(Pair(openByteArray,  vaultTransitTemplate.encrypt(KEY_NAME, opentextBase64))) }
            }.fold(Pair(ByteArrayOutputStream(), StringBuilder())) { result, curPair ->
                result.first.writeBytes(curPair.first)
                result.second.append(curPair.second)
                result
            }
    Pair(pair.first.toByteArray(), pair.second.toString())
}

PS 该fold(...)函数将打开的块收集到 aByteArrayOutputStream以稍后计算哈希,并将加密的块收集到 aStringBuilder作为加密文件的结果。

PPS 我已经尝试过我的方法。该方法在我的 Core i5 8gen 4 物理内核机器上平均提交 5-7 个并行请求。它完成了它的工作,但没有那么快。如果 Vault 不在本地部署,我每 1 Mb 加密大约需要 1 秒。我知道这取决于网络的延迟。我什至不考虑 Vault 一侧的加密速度,由于只有 4Kb 的块大小,它快如闪电。有什么方法可以提高并发速度吗?

PPPS 我试过concurrency = MAX_CONCURRENT_REQUESTSflatMapMerge{...}. 到目前为止,没有什么显着的结果。最好将其保留为默认值。

4

0 回答 0