8

我开始使用 Project reactor,而我很少挣扎的一个地方是如何将来自 Mono 的东西与 Flux 结合起来。这是我的用例:

public interface GroupRepository {
       Mono<GroupModel> getGroup(Long groupId);
}

public interface UserRepository {
       Flux<User> getUsers(Set<Long> userIds);   
}

Mono<GroupModel> groupMono = getGroup(groupId);
Flux<User> userFlux = getUsers(Set<Long> users);
//run above instrtuction in parallel and associate user to group.

现在我想要实现的是:

如何结合来自 UserFlux 的响应并将这些用户与该组相关联,例如 group.addUsers(userfromFlux)。

有人可以帮助如何组合来自 userFlux 和 groupMono 的结果。我想我使用像 Zip 这样的东西,但它会从源代码进行一对一的映射。就我而言,我需要进行 1 到 N 映射。在这里,我有一个组,但需要将多个用户添加到该组中。返回Mono<List<Users>然后将 zip 运算符与单声道一起使用并提供此处提到的组合器是一个好主意
public static <T1, T2, O> Flux<O> zip(Publisher<? extends T1> source1, Publisher<? extends T2> source2, final BiFunction<? super T1, ? super T2, ? extends O> combinator)吗?

4

4 回答 4

6

这种 1 到 N 的映射听起来类似于我在这里给出的答案:

你能 Flux.zip 一个单声道和一个助焊剂,并为每个助焊剂值重复单声道值吗?

万一该链接断开,这又是答案。我认为这种方法不会有很好的性能,因为每次都会重新计算单声道。为了获得更好的性能,如果您的 Mono 包含缓慢的操作,那么拥有一些缓存层可能会很好。

假设你有一个通量和一个像这样的单声道:

 // a flux that contains 6 elements.
 final Flux<Integer> userIds = Flux.fromIterable(List.of(1,2,3,4,5,6));

 // a mono of 1 element.
 final Mono<String> groupLabel = Mono.just("someGroupLabel");

首先,我将向您展示尝试压缩我尝试的 2 的错误方法,我认为其他人会尝试:

 // wrong way - this will only emit 1 event 
 final Flux<Tuple2<Integer, String>> wrongWayOfZippingFluxToMono = userIds
         .zipWith(groupLabel);

 // you'll see that onNext() is only called once, 
 //     emitting 1 item from the mono and first item from the flux.
 wrongWayOfZippingFluxToMono
         .log()
         .subscribe();

错误道

 // this is how to zip up the flux and mono how you'd want, 
 //     such that every time the flux emits, the mono emits. 
 final Flux<Tuple2<Integer, String>> correctWayOfZippingFluxToMono = userIds
         .flatMap(userId -> Mono.just(userId)
                 .zipWith(groupLabel));

 // you'll see that onNext() is called 6 times here, as desired. 
 correctWayOfZippingFluxToMono
         .log()
         .subscribe();

在此处输入图像描述

于 2020-05-16T05:48:29.847 回答
3

我认为Flux.combineLatest静态方法可以帮助您:因为您Mono只发出 1 个元素,该元素将始终是与来自Flux.

Flux.combineLatest(arr -> new Combination((GroupModel) arr[0], (User) arr[1]),
                   groupMono, userFlux);
于 2017-04-20T08:37:21.257 回答
2
Flux.fromIterable(List.of(1,2,3,4,5,6))
      .zipWith(Mono.just("groupLabel").cache().repeat())

将您的标签压缩到通量发出的每个值

于 2021-10-27T16:26:41.843 回答
1

为其他人添加答案,我使用了Flux.zip(groupMono, userMono(holding list of users), this::biFunctionToPopulateGroupWithUsers). 我使用了这种方法,而不是 @Simon 建议的方法,因为持有用户的底层组是 aHashSet并且以反应方式添加用户不会是线程安全的。但如果你有一个线程安全的数据结构,我会使用@Simon 的建议。

于 2017-04-20T20:07:03.303 回答