每在这里我曾经有代码
EmitterProcessor<String> emitter = EmitterProcessor.create();
FluxSink<String> sink = emitter.sink(FluxSink.OverflowStrategy.LATEST);
sink.onCancel(() -> {
cancelSink(id, request);
});
例如,当使用rSocket浏览器打开会话并询问一些数据时,EmitterProcessor当客户端关闭浏览器时publisher调用
Flux<String> out = Flux
.from(emitter
.log(log.getName()));
会知道 Flux 订阅者被取消(当浏览器关闭时)并且会调用onCancel句柄。
随着Sinks.Many()我已经实施
Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
sink.asFlux().doOnCancel(() -> {
cancelSink(id, request);
});
Flux<String> out = Flux
.from(sink.asFlux()
.log(log.getName()));
并且字符串通过通量发布到浏览器,但是当客户端关闭会话时,不再onCancel需要处理一些整理。