1

在这里我曾经有代码

EmitterProcessor<String> emitter = EmitterProcessor.create();
FluxSink<String> sink = emitter.sink(FluxSink.OverflowStrategy.LATEST);

sink.onCancel(() -> {
  cancelSink(id, request);
});

例如,当使用rSocket浏览器打开会话并询问一些数据时,EmitterProcessor当客户端关闭浏览器时publisher调用

Flux<String> out = Flux
    .from(emitter
    .log(log.getName())); 

会知道 Flux 订阅者被取消(当浏览器关闭时)并且会调用onCancel句柄。

随着Sinks.Many()我已经实施

Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();

sink.asFlux().doOnCancel(() -> {
    cancelSink(id, request);
});

Flux<String> out = Flux
    .from(sink.asFlux()
    .log(log.getName()));

并且字符串通过通量发布到浏览器,但是当客户端关闭会话时,不再onCancel需要处理一些整理。

看起来这里和这里都讨论过这个问题,但我不明白解决方案。请问是什么?

4

1 回答 1

1

sink.asFlux().doOnCancel(...)并且sink.asFlux()是两个不同的实例。您没有重复使用已设置取消处理逻辑的那个,这就是为什么您没有观察到变量的cancelSink清理。out

做更多类似的事情:

Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();

Flux<String> fluxWithCancelSupport = sink.asFlux().doOnCancel(() -> {
    cancelSink(id, request);
});

Flux<String> out = fluxWithCancelSupport
    .log(log.getName()));

(PS:你不需要,Flux.from(sink.asFlux())因为后者已经给了你一个Flux)。

于 2021-03-22T09:52:38.817 回答