我们正在使用 project-reactor 从外部 web 服务中检索一些数据并生成一堆结果对象。
首先,我们需要获取一些触发下一次 Web 服务调用所需的主数据。在主数据可用后,我们会根据主数据的结果检索更多数据。接下来我们必须等待所有 Monos 发出它的结果。然后我们处理所有数据并构建我们的结果对象。
我们在反应流方面没有太多经验。我们的嵌套订阅解决方案有效,但我们相信可能有更好的方法来归档我们想要做的事情。
问题 1
Masterdata_A 和 Masterdata_B 可以并行获取,但是如何在不嵌套的情况下以反应方式表达呢?getFluxMasterdata_B 的每个结果都应该与 getMonoMasterdata_A 的一个结果相结合。
问题2
具有两个 Masterdata 的 Tupel 应该以某种方式受到限制,以免 Web 服务因许多数据请求而不堪重负。1 秒的实际延迟只是一个似乎可行的猜测,但最好定义第一个内部 flatMap 的最大并行执行数,以便一次最多有 N 个等待的 web 服务调用。
问题 3
将来我们可能必须从 Web 服务中获取更多数据来构建 ProcessingResult。是否有定义反应流以使其可读/可理解的最佳实践?反应流的嵌套可以还是应该避免(将所有内容保持在顶层)?
领域模型
private static class Masterdata_A
{
private List<MasterdataRecord_A> records;
}
private static class MasterdataRecord_A { /* ... business relevant fields */ }
private static class MasterdataRecord_B { /* ... business relevant fields */ }
private static class Data_A { /* ... business relevant fields */ }
private static class Data_B { /* ... business relevant fields */ }
private static class Data_C { /* ... business relevant fields */ }
private static class ProcessingResult { /* ... business relevant fields */ }
WebserviceImpl
private static class Webservice
{
private Mono<Masterdata_A> getMonoMasterdata_A() { /* fetch data from external webservice */ }
private Flux<MasterdataRecord_B> getFluxMasterdata_B() { /* fetch data from external webservice */ }
private Mono<Data_A> getMonoData_A() { /* fetch data from external webservice */ }
private Mono<Data_B> getMonoData_B() { /* fetch data from external webservice */ }
private Mono<Data_C> getMonoData_C() { /* fetch data from external webservice */ }
}
业务服务实现
public class BusinessService
{
public void processData(...params...)
{
Webservice webservie = getWebservice();
// As soon as Mono<Masterdata_A> emits its result AND Flux<Masterdata_B> emits its first result than the first inner flatMap should be executed
// to fetch some extra data from the service based on the actual masterdata.
// For building the ProcessingResult we need access to all data available in the actual context.
webservice.getMonoMasterdata_A()
.subscribe((Masterdata_A masterdataA) -> {
webservice.getFluxMasterdata_B()
.delayElements(Duration.ofSeconds(1))
.flatMap((MasterdataRecord_B masterdataB) -> {
Mono<Data_A> monoA = webservice.getMonoData_A(masterdataA);
Mono<Data_B> monoB = webservice.getMonoData_B(masterdataB);
Mono<Data_C> monoC = webservice.getMonoData_C(masterdataA, masterdataB);
// wait for result of all Monos
return Mono.zip(monoA, monoB, monoC);
})
.flatMap((Tuple3<Data_A, Data_B, Data_C> data) -> {
Data_A dataA = data.getT1();
Data_B dataB = data.getT2();
Data_C dataC = data.getT3();
// create result from masterdataA, masterdataB, dataA, dataB, dataC
ProcessingResult result = ...;
return Mono.just(result);
})
.subscribe(processingResult -> {
// store result to db/filesystem
});
});
}
}