2

该线程是 Github 问题的延续:https ://github.com/spring-projects/spring-data-r2dbc/issues/194

语境:

你好,

我刚刚尝试了一个非常简单的示例,它基于两个反应式存储库:

给定 br一个 r2dbc crud repo 和cr另一个 r2dbc crud repo:

br.findAll()
   .flatMap(br -> {
      return cr.findById(br.getPropertyOne())
               .doOnNext(c -> br.setProperty2(c))
               .thenReturn(br);
    })
   .collectList().block();

此代码示例永远不会完成(只有前 250 个左右的条目到达.collectList操作员)。经过一番挖掘,在 之后添加一些onBackpressureXXX运算符findAll似乎可以通过...删除元素或缓冲它们来“解决”问题。

在这一点上,我的理解是 r2dbc 反应式存储库不使用消费者反馈机制,这消除了 r2dbc 的大部分好处。

我错了吗 ?有没有更好的方法来实现相同的目标?

谢谢 !


来自@mp911de 的建议:

作为一般规则,避免在另一个流处于活动状态时创建流(著名引用:不要跨越流)。

如果您想获取相关数据,那么理想情况下将所有结果收集为 List 和运行子查询。这样,初始响应流被消耗并且连接可以自由地获取额外的结果。

类似以下代码段的内容应该可以完成这项工作:

br.findAll().collectList()
        .flatMap(it -> {

            List<Mono<Reference>> refs = new ArrayList<>();
            for (Person p : it) {
                 refs.add(cr.findById(br.getPropertyOne()).doOnNext(…));
            }

            return Flux.concat(refs).thenReturn(it);
        });

但这消除了流式传输数据而不将其全部保存在内存中的好处(我的最后一步不是列出,而是流式写入以输出到某个文件)。

对此有任何帮助吗?

4

0 回答 0