3

我是响应式编程的新手,我想并行进行两个 API 调用并处理结果并返回一个简单的数组或项目列表。

我有两个函数,一个返回 Flux,另一个返回 Mono,我根据 Mono 的结果对 Flux 发出的项目进行了非常简单的过滤逻辑。

我尝试使用zipWith,但无论采用何种过滤逻辑,只有一项完成。我也尝试过,block但控制器内部不允许这样做:/

@GetMapping("/{id}/offers")
fun viewTaskOffers(
        @PathVariable("id") id: String,
        @AuthenticationPrincipal user: UserPrincipal
) : Flux<ViewOfferDTO> {
    data class TaskOfferPair(
        val task: TaskDTO,
        val offer: ViewOfferDTO
    )

    return client.getTaskOffers(id).map {
            it.toViewOfferDTO()
        }.zipWith(client.getTask(id), BiFunction {
            offer: ViewOfferDTO, task: TaskDTO -> TaskOfferPair(task, offer)
        }).filter {
            it.offer.workerUser.id == user.id || it.task.creatorUser == user.id
        }.map {
            it.offer
        }
}
  • getTaskOffers返回一个通量OfferDTO
  • getTask返回一个 MonoTaskDTO

如果您无法回答我的问题,请至少告诉我如何并行执行多个 API 调用并等待 WebClient 中的结果

4

3 回答 3

4

这是一个并行调用的用例。

public Mono<UserInfo> fetchCarrierUserInfo(User user) {
        Mono<UserInfo> userInfoMono = fetchUserInfo(user.getGuid());
        Mono<CarrierInfo> carrierInfoMono = fetchCarrierInfo(user.getCarrierGuid());

        return Mono.zip(userInfoMono, carrierInfoMono).map(tuple -> {
            UserInfo userInfo = tuple.getT1();
            userInfo.setCarrier(tuple.getT2());
            return userInfo;
        });
    }

这里:

  • fetchUserInfo进行 http 调用以从另一个服务获取用户信息并返回Mono
  • fetchCarrierInfo方法进行 HTTP 调用以从另一个服务获取 carrierInfo 并返回Mono
  • Mono.zip()将给定的 Mono 合并到一个新的 Mono 中,当所有给定的 Mono 都产生了一个项目时,新的 Mono 将被实现,并将它们的值聚合到一个 Tuple2 中。

然后,调用fetchCarrierUserInfo().block()它以获得最终结果。

于 2019-12-09T10:08:27.733 回答
1

正如您已经知道的那样,zipWith它不会帮助您,因为它会产生min(a.size, b.size),它始终是 1,以防其中一个是Mono

但由于这两个是独立的,您可以简单地拆分它们:

val task: Mono<TaskDTO> = client.getTask(id)
val result: Flux<ViewOfferDTO> = 
task.flatMapMany {t ->
        client.getTaskOffers(id).map {offer ->
            t to offer
        }
    }.filter {
        it.second.workerUser.id == user.id || it.first.creatorUser == user.id
    }.map {
        it.second
}

请注意,如果您想要一对元素,您可以使用内置的Pair.

此外,此检查没有多大意义,因为您只有Monoit.first.creatorUser

于 2019-06-05T22:07:26.613 回答
1

使用repeat()将 Mono 转换为 Flux :

client.getTask(id).cache().repeat();

所以你的代码会变成

    return client.getTaskOffers(id).map {
        it.toViewOfferDTO()
    }.zipWith(client.getTask(id).cache().repeat(), BiFunction {
        offer: ViewOfferDTO, task: TaskDTO -> TaskOfferPair(task, offer)
    }).filter {
        it.offer.workerUser.id == user.id || it.task.creatorUser == user.id
    }.map {
        it.offer
    }
于 2019-12-10T11:57:12.770 回答