1

假设我有两个通量Flux<Class1>Flux<Class2>并且 Class1 和 Class2 都有一个共同的属性,比如“id”。

用例是基于公共属性“id”连接两个通量并构造单个Flux<Tuple<Class1, Class2>>,类似于连接两个 sql 表。

- 对于属性 id,两个通量之间总是存在 1 对 1 的匹配。

- 助焊剂不会包含超过 100 个对象。

- 助焊剂不按 id 排序。

我如何在 Project Reactor/Spring web Flux 中实现这一点?

4

2 回答 2

0

我认为这应该适用于以下限制:

  • 第二个Flux需要向所有订阅者发出相同的元素,因为它会一遍又一遍地订阅。
  • 这基本上相当于嵌套循环连接,因此对于大通量来说效率非常低。
  • 第一个元素的每个元素Flux在第二个元素中都有一个匹配元素。

    flux1.flatMap(
        f1 -> flux2.filter(f2 -> f2.id.equals(f1.id)).take(1)) // take the first with matching id
              .map(f2 -> Tuple.of(f1,f2))) // convert to tuple.
    

在没有 IDE 的情况下编写。考虑伪代码。

于 2018-07-20T10:26:10.273 回答
0

假如说:

  • 两个集合都不是很大(您可以将它们保存在内存中而不会冒 OOM 问题的风险)
  • 它们不是按 id 排序的
  • 集合中的每个元素在另一个中都有对应的元素

首先,您应该制作这些Class1Class2实现Comparable或至少准备一个比较器实现,您可以使用它来按它们的 id 对它们进行排序。

然后您可以使用zip运算符:

Flux<Class1> flux1 = ...
Flux<Class2> flux2 = ...
Flux<Tuple2<Class1,Class2>> zipped = Flux.zip(flux1.sort(comparator1), flux2.sort(comparator2));

Tuple2是一个 Reactor 核心类,可让您像这样访问元组的每个元素

Tuple2<Class1,Class2> tuple = ...
Class1 klass1 = tuple.getT1();
Class2 klass2 = tuple.getT2();

在这种情况下,sort将缓冲所有元素,如果集合很大,这可能会导致内存/延迟问题。根据这些集合中的排序方式(假设不能保证排序,但它们是批量插入的),您还可以缓冲其中的一些(使用window)并在每个窗口上进行排序(使用sort)。

当然,理想情况下,能够同时获取已排序的数据将避免缓冲数据并改善应用程序中的背压支持。

于 2018-07-20T08:41:30.897 回答