6

我试图弄清楚反应堆项目,我正在寻找一种取消订阅的方法。我知道在进行例如 Flux 的订阅后,我可以获得对可用于发送 onCancel 信号的 Cancellation 对象的引用,但这只是在进行订阅之后,我需要在某种集合中保存该引用。

有没有更好的方法来获取 Cancellation 对象?或者只是取消订阅。也许某种包含对所有活动订阅的引用的地方 - 是的,那会很棒......

4

1 回答 1

7

Subscription在 Reactor 中,想要在调用之前取消 a 是没有意义的subscribe()(因为正是这种方法创建了Subscription该信号并将该信号向上传播以开始发送数据)。

所有订阅都没有集中的地方,这没有多大意义,因为您需要一种找到要取消的特定订阅的方法(请记住,链中的每个运营商也可以使用中间订阅...)。

请注意,一些运营商也会代表您取消订阅!例如take(int),一旦发出足够的项目,它将取消上游:

Flux.just(1, 2, 3, 4).log().take(2).subscribe(System.out::println);

将输出:

14:17:48.729 [main] INFO  reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
14:17:48.732 [main] INFO  reactor.Flux.Array.1 - | request(unbounded)
14:17:48.732 [main] INFO  reactor.Flux.Array.1 - | onNext(1)
1
14:17:48.732 [main] INFO  reactor.Flux.Array.1 - | onNext(2)
2
14:17:48.732 [main] INFO  reactor.Flux.Array.1 - | cancel()
于 2017-01-05T13:18:42.707 回答