5

我有以下通量:

Flux.range(0, 100)
    .log("before window")
    .window(10)
    .map(Flux::toList)
    .log("after window")
    .map((w) -> {
        System.out.println(w.subscribe().get()));
        return 1;
    }) 
    .reduce(0, (a, b) -> a + b)
    .doOnSuccess(System.out::println)
    .subscribe();

我想在doOnNext这一刻我应该能够获得物品块。但.get总是以异常失败Timeout on Mono blocking read

我从日志中看到,之前都已处理过.window,结果日志给出:

[           main] before window : onNext(0)
[           main] after window  : onNext({ operator : "BufferAll" })

官方文档中没有太多信息,所以我想我误解了一些东西,并且window错误地使用了函数。但是我到底做错了什么?

更新

我发现如果我改用这个特殊问题可以避免.doOnSuccess。喜欢:

.map((w) -> {
    w.doOnSuccess((w2) -> System.out.println(w2))).subscribe();
    return 1;
}

但真正的问题是,在我的情况下,我需要根据对提供的数据(而不是1)的一些计算返回一个数字。我可以在这里创建一个新Mono的,但无论如何,以后我应该.get它。例如在 final 中.reduce。因此,如果我这样做.reduce(0, (a, b) -> a + b.get()),那么它将在那里失败。

我如何安全地从 Mono 中获得价值?

更新 2

现在我已经删除了 Flux:toList 并自己完成,Mono从窗口后的映射阶段返回。这大概就是应该的样子。

.window(10)
.log("after window")
.map((w) -> {
    //basically i'm reducing Flux to a Mono<List> and return number of a [good] elements in it
    return w.reduce(...).map(ids -> 100).subscribe();
})
.reduce(0, (a, b) -> a + b.get()) 

但它无论如何都不起作用,卡在.reduce

请注意,如果我删除.reduce步骤,它会起作用。在这种情况下,Flux提供者的处理在主流程之后.window执行。我无法控制它,甚至无法得到最终结果。这没有任何意义。

4

2 回答 2

2

问题是因为我需要在进一步使用之前减少窗口。

喜欢:

window(...).flatMap( (window) -> window.reduce(...))

我在我的映射器中的 Mono 中这样做,但它阻止了那里的执行流程,所以它不是一个正确的地方。它必须在窗口之后,在下次使用之前。

正确的版本是:

Flux.range(0, 100)
    .window(10)
    .flatMap(window -> {
         return window.reduce(new ArrayList<>(), (a, b) -> {
             a.add(b);
             return a;
         });
    })
    .map((list) -> list.size())
    .reduce(0, (a, b) -> a + b)
    .doOnSuccess(System.out::println)
    .subscribe();

我正在将 window 转换为 a List,然后我可以在以下操作中使用此值。

于 2016-05-27T04:51:20.903 回答
1

我最近一直在看这个。该.window(3)功能将其变成通量。我认为,这可以通过嵌套订阅来巧妙地处理。当窗口为 时,该.buffer()函数将通量转换为列表onComplete()

这段代码对我来说是一个突破......

Flux<Integer> flux = Flux.range(1, 10).log();

flux
  .doOnNext(s -> logger.info("pre-window:{}", s))
  .window(3)
  .subscribe(s -> s.log().buffer().subscribe(t -> logger.info("post-window:{}", t)));

此外,简单地使用.buffer(3)似乎会产生相似的结果,如果不是相同的结果......

flux
    .doOnNext(s -> logger.info("pre-buffer:{}", s))
    .buffer(3)
    .subscribe(t -> logger.info("post-buffer:{}", t));

希望有帮助!

于 2017-04-28T06:26:11.770 回答