0

给定两个具有稀疏 ID 的实体流。让我们将它们建模为:

Flux<Long> stream1 = Flux.fromArray(new Long[] {1L, 3L, 4L, 5L, 6L});
Flux<Long> stream2 = Flux.fromIterable(List.of(1L, 2L, 3L, 4L, 6L, 7L));

实现一个组成管道的函数,该管道执行 SQL 中称为 FULL OUTER JOIN 的操作。这样最后调用如下代码:

public static Flux<Map.Entry<Long, Long>> fullOuterJoin(Flux<Long> stream1, Flux<Long> stream2) {
}
fullOuterJoin(stream1, stream2).log().subscribe();

产生类似于以下的结果:

onSubscribe(...)
request(...)
onNext(1=1)
onNext(null=2)
onNext(3=3)
onNext(4=4)
onNext(5=null)
onNext(6=6)
onNext(null=7)
onComplete()

不知道 .join() 是否可以使用,尝试过 .zip() 但它不会按 ID 映射它们,并在第一个序列用完元素时停止。我知道 .bufferUntil() 可以使用,但正在寻找其他一些选项,最好是我缺少的一些本机支持。任何关于如何有效实施它的想法都非常受欢迎。

4

1 回答 1

0

这不是直接适合 Reactive Streams,因为 RS 中不允许空值,并且没有“稀疏序列”的概念。试图将 SQL/Set Theory 概念硬塞进 RS 并不能保证成功:(

RxJava 中有一个答案,但它需要来自扩展的运算符,而我们在 Reactor 中还没有:

于 2017-11-28T10:42:27.540 回答