我使用 Spring webflux 来满足一般要求,使用 Spring RSocket over websocket 来满足服务器推送和聊天要求。
春季启动版本 - 2.5.6
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-security'
implementation 'org.springframework.security:spring-security-messaging'
implementation 'org.springframework.security:spring-security-rsocket'
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
应用程序.yml
spring:
rsocket:
server:
port: 9091
mapping-path: /rsocket
transport: websocket
RSocketConfiguration.java
@Configuration
public class RSocketConfiguration {
@Bean
public RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies, RSocketProperties rSocketProps,
ServerProperties serverProps) {
RSocketRequester.Builder builder = RSocketRequester.builder();
return builder
.rsocketConnector(
rSocketConnector -> rSocketConnector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2))))
.dataMimeType(MimeTypeUtils.APPLICATION_JSON).websocket(getURI(rSocketProps, serverProps));
}
private URI getURI(RSocketProperties rSocketProps, ServerProperties serverProps) {
String protocol = serverProps.getSsl() != null && serverProps.getSsl().isEnabled() ? "wss" : "ws";
return URI.create(String.format("%s://localhost:%d%s", protocol, rSocketProps.getServer().getPort(),
rSocketProps.getServer().getMappingPath()));
}
}
RocketController.java
@Controller
public class RSocketController {
private static final Map<Integer, Map<Long, RSocketRequester>> REQUESTER_MAP = new HashMap<>();
static final String SERVER = "Server";
static final String RESPONSE = "Response";
static final String STREAM = "Stream";
static final String CHANNEL = "Channel";
@MessageMapping("/api/v1/service/chat/hello")
public Mono<String> test() {
return Mono.just("Hello");
}
@ConnectMapping("usr-id")
void onConnect(RSocketRequester rSocketRequester, @Payload Long userId, @AuthenticationPrincipal UserDetails user) {
rSocketRequester.rsocket().onClose().subscribe(null, null,
() -> REQUESTER_MAP.get(user.getBranchId()).remove(user.getUserId(), rSocketRequester));
final Map<Long, RSocketRequester> userSocket = new HashMap<>();
userSocket.put(userId, rSocketRequester);
REQUESTER_MAP.put(user.getBranchId(), userSocket);
}
/**
* This @MessageMapping is intended to be used "request --> response" style. For
* each Message received, a new Message is returned with ORIGIN=Server and
* INTERACTION=Request-Response.
*
* @param request
* @return Message
*/
//@PreAuthorize("hasRole('USER')")
@MessageMapping("request-response")
Mono<Message> requestResponse(final Message request, @AuthenticationPrincipal UserDetails user) {
// create a single Message and return it
return Mono.just(new Message("Test"));
}
/**
* This @MessageMapping is intended to be used "fire --> forget" style. When a
* new CommandRequest is received, nothing is returned (void)
*
* @param request
* @return
*/
//@PreAuthorize("hasRole('USER')")
@MessageMapping("fire-and-forget")
public Mono<Void> fireAndForget(final Message request, @AuthenticationPrincipal UserDetails user) {
// REQUESTER_MAP.get("").rsocket().fireAndForget(null);
return Mono.empty();
}
}
RSocket 的测试客户端,
public class ExampleClient {
public static void main(String[] args) {
WebsocketClientTransport ws = WebsocketClientTransport.create(URI.create("ws://localhost:9091/rsocket"));
RSocket clientRSocket = RSocketConnector.connectWith(ws).block();
try {
System.out.println("Test Availability "+clientRSocket.availability());
} finally {
clientRSocket.dispose();
}
}
}
服务器执行上述客户端代码时出现异常,
[2m2021-11-05 10:38:34.145[0;39m [31mERROR [AdminService,,][0;39m [35m11848[0;39m [2m---[0;39m [2m[ctor-http-nio-3][0;39m [36mr.c.p.Operators [0;39m [2m:[0;39m Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed
Caused by: java.util.concurrent.CancellationException: Disposed
at io.rsocket.internal.UnboundedProcessor.dispose(UnboundedProcessor.java:545) ~[rsocket-core-1.1.1.jar:?]
at io.rsocket.transport.netty.WebsocketDuplexConnection.doOnClose(WebsocketDuplexConnection.java:72) ~[rsocket-transport-netty-1.1.1.jar:?]
at io.rsocket.internal.BaseDuplexConnection.lambda$new$0(BaseDuplexConnection.java:30) ~[rsocket-core-1.1.1.jar:?]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:163) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:146) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:238) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:70) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46) ~[reactor-core-3.4.10.jar:3.4.10]
at io.rsocket.transport.netty.WebsocketDuplexConnection.lambda$sendErrorAndClose$2(WebsocketDuplexConnection.java:94) ~[rsocket-transport-netty-1.1.1.jar:?]
at reactor.core.publisher.LambdaMonoSubscriber.onComplete(LambdaMonoSubscriber.java:135) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Operators.complete(Operators.java:137) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.netty.FutureMono.doSubscribe(FutureMono.java:122) ~[reactor-netty-core-1.0.11.jar:1.0.11]
at reactor.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:114) ~[reactor-netty-core-1.0.11.jar:1.0.11]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.10.jar:3.4.10]
at org.springframework.cloud.sleuth.instrument.reactor.SleuthMonoLift.subscribe(ReactorHooksHelper.java:225) ~[spring-cloud-sleuth-instrumentation-3.0.2-SNAPSHOT.jar:3.0.2-SNAPSHOT]
at reactor.core.publisher.Mono.subscribe(Mono.java:4361) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4476) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Mono.subscribe(Mono.java:4332) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Mono.subscribe(Mono.java:4268) ~[reactor-core-3.4.10.jar:3.4.10]
at io.rsocket.transport.netty.WebsocketDuplexConnection.sendErrorAndClose(WebsocketDuplexConnection.java:88) ~[rsocket-transport-netty-1.1.1.jar:?]
at io.rsocket.core.SetupHandlingDuplexConnection.sendErrorAndClose(SetupHandlingDuplexConnection.java:163) ~[rsocket-core-1.1.1.jar:?]
at io.rsocket.core.ServerSetup.sendError(ServerSetup.java:55) ~[rsocket-core-1.1.1.jar:?]
at io.rsocket.core.RSocketServer.lambda$null$1(RSocketServer.java:441) ~[rsocket-core-1.1.1.jar:?]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:250) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1863) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:270) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2063) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:270) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2063) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:270) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2063) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:270) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:270) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2063) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Operators.error(Operators.java:198) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoError.subscribe(MonoError.java:53) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxFilter$FilterSubscriber.onComplete(FluxFilter.java:166) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoNext$NextSubscriber.onComplete(MonoNext.java:102) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:83) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:282) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:861) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onComplete(FluxDefaultIfEmpty.java:109) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxFilter$FilterSubscriber.onComplete(FluxFilter.java:166) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:150) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onComplete(FluxFilterFuseable.java:171) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1817) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:113) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:186) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:110) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxFilter$FilterSubscriber.onSubscribe(FluxFilter.java:85) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoCurrentContext.subscribe(MonoCurrentContext.java:36) [reactor-core-3.4.10.jar:3.4.10]
at org.springframework.cloud.sleuth.instrument.reactor.SleuthMonoLift.subscribe(ReactorHooksHelper.java:225) [spring-cloud-sleuth-instrumentation-3.0.2-SNAPSHOT.jar:3.0.2-SNAPSHOT]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:191) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:110) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onSubscribe(FluxFilterFuseable.java:87) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:449) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:219) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) [reactor-core-3.4.10.jar:3.4.10]
at org.springframework.cloud.sleuth.instrument.reactor.SleuthMonoLift.subscribe(ReactorHooksHelper.java:225) [spring-cloud-sleuth-instrumentation-3.0.2-SNAPSHOT.jar:3.0.2-SNAPSHOT]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) [reactor-core-3.4.10.jar:3.4.10]
at org.springframework.cloud.sleuth.instrument.reactor.SleuthMonoLift.subscribe(ReactorHooksHelper.java:225) [spring-cloud-sleuth-instrumentation-3.0.2-SNAPSHOT.jar:3.0.2-SNAPSHOT]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onComplete(MonoFlatMap.java:181) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxFilter$FilterSubscriber.onComplete(FluxFilter.java:166) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:186) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:110) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxFilter$FilterSubscriber.onSubscribe(FluxFilter.java:85) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoCurrentContext.subscribe(MonoCurrentContext.java:36) [reactor-core-3.4.10.jar:3.4.10]
at org.springframework.cloud.sleuth.instrument.reactor.SleuthMonoLift.subscribe(ReactorHooksHelper.java:225) [spring-cloud-sleuth-instrumentation-3.0.2-SNAPSHOT.jar:3.0.2-SNAPSHOT]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) [reactor-core-3.4.10.jar:3.4.10]
at org.springframework.cloud.sleuth.instrument.reactor.SleuthMonoLift.subscribe(ReactorHooksHelper.java:225) [spring-cloud-sleuth-instrumentation-3.0.2-SNAPSHOT.jar:3.0.2-SNAPSHOT]
at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Operators.complete(Operators.java:137) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:195) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) [reactor-core-3.4.10.jar:3.4.10]
at org.springframework.cloud.sleuth.instrument.reactor.SleuthMonoLift.subscribe(ReactorHooksHelper.java:225) [spring-cloud-sleuth-instrumentation-3.0.2-SNAPSHOT.jar:3.0.2-SNAPSHOT]
at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:150) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:150) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Operators.complete(Operators.java:137) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:195) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) [reactor-core-3.4.10.jar:3.4.10]
at org.springframework.cloud.sleuth.instrument.reactor.SleuthMonoLift.subscribe(ReactorHooksHelper.java:225) [spring-cloud-sleuth-instrumentation-3.0.2-SNAPSHOT.jar:3.0.2-SNAPSHOT]
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoIgnorePublisher.subscribe(MonoIgnorePublisher.java:57) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxFirstWithSignal$FirstEmittingSubscriber.onNext(FluxFirstWithSignal.java:330) [reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:160) [reactor-core-3.4.10.jar:3.4.10]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:114) [rsocket-core-1.1.1.jar:?]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) [rsocket-core-1.1.1.jar:?]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) [reactor-core-3.4.10.jar:3.4.10]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279) [reactor-netty-core-1.0.11.jar:1.0.11]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388) [reactor-netty-core-1.0.11.jar:1.0.11]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404) [reactor-netty-core-1.0.11.jar:1.0.11]
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:579) [reactor-netty-http-1.0.11.jar:1.0.11]
at reactor.netty.http.server.WebsocketServerOperations.onInboundNext(WebsocketServerOperations.java:167) [reactor-netty-http-1.0.11.jar:1.0.11]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) [reactor-netty-core-1.0.11.jar:1.0.11]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler.channelRead(BaseWebsocketServerTransport.java:63) [rsocket-transport-netty-1.1.1.jar:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [netty-codec-4.1.68.Final.jar:4.1.68.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [netty-codec-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.68.Final.jar:4.1.68.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.68.Final.jar:4.1.68.Final]
at java.lang.Thread.run(Thread.java:831) [?:?]