我有以下通量:
Flux.range(0, 100)
.log("before window")
.window(10)
.map(Flux::toList)
.log("after window")
.map((w) -> {
System.out.println(w.subscribe().get()));
return 1;
})
.reduce(0, (a, b) -> a + b)
.doOnSuccess(System.out::println)
.subscribe();
我想在doOnNext
这一刻我应该能够获得物品块。但.get
总是以异常失败Timeout on Mono blocking read
。
我从日志中看到,之前都已处理过.window
,结果日志给出:
[ main] before window : onNext(0)
[ main] after window : onNext({ operator : "BufferAll" })
官方文档中没有太多信息,所以我想我误解了一些东西,并且window
错误地使用了函数。但是我到底做错了什么?
更新
我发现如果我改用这个特殊问题可以避免.doOnSuccess
。喜欢:
.map((w) -> {
w.doOnSuccess((w2) -> System.out.println(w2))).subscribe();
return 1;
}
但真正的问题是,在我的情况下,我需要根据对提供的数据(而不是1
)的一些计算返回一个数字。我可以在这里创建一个新Mono
的,但无论如何,以后我应该.get
它。例如在 final 中.reduce
。因此,如果我这样做.reduce(0, (a, b) -> a + b.get())
,那么它将在那里失败。
我如何安全地从 Mono 中获得价值?
更新 2
现在我已经删除了 Flux:toList 并自己完成,Mono
从窗口后的映射阶段返回。这大概就是应该的样子。
.window(10)
.log("after window")
.map((w) -> {
//basically i'm reducing Flux to a Mono<List> and return number of a [good] elements in it
return w.reduce(...).map(ids -> 100).subscribe();
})
.reduce(0, (a, b) -> a + b.get())
但它无论如何都不起作用,卡在.reduce
:
请注意,如果我删除.reduce
步骤,它会起作用。在这种情况下,Flux
提供者的处理在主流程之后.window
执行。我无法控制它,甚至无法得到最终结果。这没有任何意义。