0

我正在实现一个典型的用例,其中客户端请求将异步生成的资源。因此,会立即生成并返回一个 resourceID:

1. CLIENT ---(POST /request-resource)--->  SERVER
2. SERVER (Generates resID, launches async process) ---(202 Accepted - resID)---> CLIENT

此时 SERVER 中有一个后台任务,它最终会产生一个结果并将其存储在与 resID 关联的数据库中。客户端会定期请求资源,重试直到可用:

3. CLIENT ---(/resource/resID)--->  SERVER (checks in Postgres using reactive driver)
4. SERVER ---(404 - Retry-After 5)---> CLIENT
5. CLIENT ---(/resource/resID)--->  SERVER (checks in Postgres using reactive driver)
6. SERVER ---(200 - JSON Payload)---> CLIENT

我认为 RSocket 将是一个完美的选择,以避免这种无休止的 CLIENT 重试,直到资源可用(步骤 3. on)。

哪种交互模型更适合这个问题,我该如何实现它?

考虑如下存储库:ResourceRepository.Mono<Result> getResult(String resID)

如果我选择请求/响应交互模型,我的情况与以前相同。除非有办法让 Mono 重试直到有结果。这可能吗?

使用请求/流,我可以返回Flux<Response>与 response.status=PROCESSING 类似的结果,直到对 Postgre 的查询返回结果,然后 Flux 将具有 response.status=OK 的元素,并且 Flux 将完成。在配置的时间段内完成 Flux 需要最长的时间而没有结果。在这种情况下,我怎么能提出这个问题?

我需要创建一个 Flux,它定期发出(具有最大周期超时),当存储库返回一个空的 Mono 时有一个没有结果的元素,或者当存储库有它时有一个实际值,完成 Flux。

4

1 回答 1

0

使用 RSocket 和 RequestResponse 交互模型来解决这个问题,该模型等待资源在 DB 中可用。关键是使用repeatWhenEmpty运算符:

    @MessageMapping("request-resource")
    fun getResourceWebSocket(resourceRequest: ResourceRequest): Mono<Resource> {
        return resourceService.sendResourceRequestProcessing(resourceRequest)
    }

    override fun sendResourceRequestMessage(resourceRequest: ResourceRequest): Mono<Resource> {
        val resourceId = randomUUID().toString()
        return Mono.fromCallable {
            sendKafkaResourceProcessingRequestMessage(resourceId, resourceRequest)
        }.then(poolResourceResponse(resourceId))
    }
    private fun poolResourceResponse(resourceId: String): Mono<Resource> {
        return resourceRepository.findByResourceId(resourceId)
                .repeatWhenEmpty(30) { longFlux ->
                    longFlux.delayElements(Duration.ofSeconds(1))
                            .doOnNext { logger.info("Repeating {}", it) }
                }
    }
于 2020-06-05T12:37:35.287 回答