0

有人可以解释一下这个takeUntilOther()方法是如何工作的吗?我尝试运行以下代码,但它在我的控制台上没有显示任何内容。

     Mono.just(10)
                .subscribe();

        Flux.range(1, 5)
                .takeUntilOther(Mono.just(10))
                .subscribe(System.out::println);

我不明白为什么。

4

1 回答 1

1

Kirill,

I'd suggest you referring to the appropriate part of the project reactor's documentation.

enter image description here

takeUntilOther(Publisher<?> other) Relay values from this Flux until the given Publisher emits.

Meaning, you will be receiving values from the original Flux until given Publisher<?> other starts producing events. In your case, you have a hot publisher just() that interrupts the original Flux immediately (by calling cancel() method).

I will give you one more example. Have a look at the following code snippet:

Flux.range(1, 5) // produces elements from 1 to 5
        .delayElements(Duration.ofSeconds(1)) // delays emission of each element from above for 1 second
        .takeUntilOther(Mono
                .just(10) // hot publisher. emits one element

                // delays '10' for 3 seconds. meaning that it will only 
                // appears in the original Flux in 3 seconds
                .delayElement(Duration.ofSeconds(3)) 
        )
        .subscribe(System.out::print);

The output is:

12

于 2020-02-21T18:14:05.253 回答