我是 Spring 的 Project Reactor 的新手,我不完全确定如何执行某些操作:
我有我的管道,管道返回记录。都好。
但我想计算这些记录,然后做一些事情(比如 if else),如果返回的记录 > X 则错误,否则继续。
知道 Count 返回 a Mono<Long>
,那之后我会丢失记录,我该怎么办?
我在想:
不知何故在此平面图中使用flatMap
和执行某些操作。不知何故,我看到reduce
Flux 中有一种方法可能会有所帮助。
关键是,我不确定如何进行。
我是 Spring 的 Project Reactor 的新手,我不完全确定如何执行某些操作:
我有我的管道,管道返回记录。都好。
但我想计算这些记录,然后做一些事情(比如 if else),如果返回的记录 > X 则错误,否则继续。
知道 Count 返回 a Mono<Long>
,那之后我会丢失记录,我该怎么办?
我在想:
不知何故在此平面图中使用flatMap
和执行某些操作。不知何故,我看到reduce
Flux 中有一种方法可能会有所帮助。
关键是,我不确定如何进行。
不完全确定您想要什么,因此将根据假设提供两个建议
1..您想收集所有元素然后评估是否超过n,如果是则抛出错误。您可以使用 collectList,计算元素,然后在任何情况下转换回通量。doStuff
如果总数低于限制,这只会在任何元素上。
Flux.range(1,10)
.collectList()
.flatMap(s ->
s.size()>7
? Mono.error(new RuntimeException("TOO MANY!"))
: Mono.just(s))
.flatMapMany(Flux::fromIterable)
.map(this::doStuff)
2.. 您想动态评估元素的数量,您可以使用外部原子计数器。这将doStuff
适用于每个元素,直到有问题的元素。
AtomicLong count = new AtomicLong();
Flux.range(1,10)
.flatMap(s ->
count.incrementAndGet() > 7
? Flux.error(new RuntimeException("TOO MANY!"))
: Flux.just(s))
.map(this::doStuff);
我也是 Spring Reactor 和响应式编程的新手,但我尝试过,这对我返回 Flux 元素编号的 Long 值很有用:
fluxObject.count().block().longValue()
shortValue()
在这种情况下,您也可以使用。