该线程是 Github 问题的延续:https ://github.com/spring-projects/spring-data-r2dbc/issues/194
语境:
你好,
我刚刚尝试了一个非常简单的示例,它基于两个反应式存储库:
给定 br
一个 r2dbc crud repo 和cr
另一个 r2dbc crud repo:
br.findAll()
.flatMap(br -> {
return cr.findById(br.getPropertyOne())
.doOnNext(c -> br.setProperty2(c))
.thenReturn(br);
})
.collectList().block();
此代码示例永远不会完成(只有前 250 个左右的条目到达.collectList
操作员)。经过一番挖掘,在 之后添加一些onBackpressureXXX
运算符findAll
似乎可以通过...删除元素或缓冲它们来“解决”问题。
在这一点上,我的理解是 r2dbc 反应式存储库不使用消费者反馈机制,这消除了 r2dbc 的大部分好处。
我错了吗 ?有没有更好的方法来实现相同的目标?
谢谢 !
来自@mp911de 的建议:
作为一般规则,避免在另一个流处于活动状态时创建流(著名引用:不要跨越流)。
如果您想获取相关数据,那么理想情况下将所有结果收集为 List 和运行子查询。这样,初始响应流被消耗并且连接可以自由地获取额外的结果。
类似以下代码段的内容应该可以完成这项工作:
br.findAll().collectList()
.flatMap(it -> {
List<Mono<Reference>> refs = new ArrayList<>();
for (Person p : it) {
refs.add(cr.findById(br.getPropertyOne()).doOnNext(…));
}
return Flux.concat(refs).thenReturn(it);
});
但这消除了流式传输数据而不将其全部保存在内存中的好处(我的最后一步不是列出,而是流式写入以输出到某个文件)。
对此有任何帮助吗?