2

我使用 Spring webflux 来满足一般要求,使用 Spring RSocket over websocket 来满足服务器推送和聊天要求。

春季启动版本 - 2.5.6

implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-security'
implementation 'org.springframework.security:spring-security-messaging'
implementation 'org.springframework.security:spring-security-rsocket'
implementation 'org.springframework.boot:spring-boot-starter-rsocket'

应用程序.yml

spring: 
  rsocket:
    server:
      port: 9091
      mapping-path: /rsocket
      transport: websocket 

RSocketConfiguration.java

@Configuration
public class RSocketConfiguration {

    @Bean
    public RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies, RSocketProperties rSocketProps,
            ServerProperties serverProps) {
        RSocketRequester.Builder builder = RSocketRequester.builder();
        return builder
                .rsocketConnector(
                        rSocketConnector -> rSocketConnector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2))))
                .dataMimeType(MimeTypeUtils.APPLICATION_JSON).websocket(getURI(rSocketProps, serverProps));
    }

    private URI getURI(RSocketProperties rSocketProps, ServerProperties serverProps) {
        String protocol = serverProps.getSsl() != null && serverProps.getSsl().isEnabled() ? "wss" : "ws";

        return URI.create(String.format("%s://localhost:%d%s", protocol, rSocketProps.getServer().getPort(),
                rSocketProps.getServer().getMappingPath()));
    }

}

RocketController.java

@Controller
public class RSocketController {

    private static final Map<Integer, Map<Long, RSocketRequester>> REQUESTER_MAP = new HashMap<>();

    static final String SERVER = "Server";
    static final String RESPONSE = "Response";
    static final String STREAM = "Stream";
    static final String CHANNEL = "Channel";

    @MessageMapping("/api/v1/service/chat/hello")
    public Mono<String> test() {
        return Mono.just("Hello");
    }

    @ConnectMapping("usr-id")
    void onConnect(RSocketRequester rSocketRequester, @Payload Long userId, @AuthenticationPrincipal UserDetails user) {
        rSocketRequester.rsocket().onClose().subscribe(null, null,
                () -> REQUESTER_MAP.get(user.getBranchId()).remove(user.getUserId(), rSocketRequester));

        final Map<Long, RSocketRequester> userSocket = new HashMap<>();
        userSocket.put(userId, rSocketRequester);

        REQUESTER_MAP.put(user.getBranchId(), userSocket);
    }

    /**
     * This @MessageMapping is intended to be used "request --> response" style. For
     * each Message received, a new Message is returned with ORIGIN=Server and
     * INTERACTION=Request-Response.
     *
     * @param request
     * @return Message
     */
    //@PreAuthorize("hasRole('USER')")
    @MessageMapping("request-response")
    Mono<Message> requestResponse(final Message request, @AuthenticationPrincipal UserDetails user) {
        // create a single Message and return it
        return Mono.just(new Message("Test"));
    }

    /**
     * This @MessageMapping is intended to be used "fire --> forget" style. When a
     * new CommandRequest is received, nothing is returned (void)
     *
     * @param request
     * @return
     */
    //@PreAuthorize("hasRole('USER')")
    @MessageMapping("fire-and-forget")
    public Mono<Void> fireAndForget(final Message request, @AuthenticationPrincipal UserDetails user) {

        // REQUESTER_MAP.get("").rsocket().fireAndForget(null);

        return Mono.empty();
    }

}

RSocket 的测试客户端,

public class ExampleClient {
    public static void main(String[] args) {
        WebsocketClientTransport ws = WebsocketClientTransport.create(URI.create("ws://localhost:9091/rsocket"));
        RSocket clientRSocket = RSocketConnector.connectWith(ws).block();

        try {
            System.out.println("Test Availability "+clientRSocket.availability());
        } finally {
            clientRSocket.dispose();
        }
    }
}

服务器执行上述客户端代码时出现异常,

[2m2021-11-05 10:38:34.145[0;39m [31mERROR [AdminService,,][0;39m [35m11848[0;39m [2m---[0;39m [2m[ctor-http-nio-3][0;39m [36mr.c.p.Operators                         [0;39m [2m:[0;39m Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disposed
Caused by: java.util.concurrent.CancellationException: Disposed
    at io.rsocket.internal.UnboundedProcessor.dispose(UnboundedProcessor.java:545) ~[rsocket-core-1.1.1.jar:?]
    at io.rsocket.transport.netty.WebsocketDuplexConnection.doOnClose(WebsocketDuplexConnection.java:72) ~[rsocket-transport-netty-1.1.1.jar:?]
    at io.rsocket.internal.BaseDuplexConnection.lambda$new$0(BaseDuplexConnection.java:30) ~[rsocket-core-1.1.1.jar:?]
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:163) ~[reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:146) ~[reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:238) ~[reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:70) ~[reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46) ~[reactor-core-3.4.10.jar:3.4.10]
    at io.rsocket.transport.netty.WebsocketDuplexConnection.lambda$sendErrorAndClose$2(WebsocketDuplexConnection.java:94) ~[rsocket-transport-netty-1.1.1.jar:?]
    at reactor.core.publisher.LambdaMonoSubscriber.onComplete(LambdaMonoSubscriber.java:135) ~[reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299) ~[reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940) ~[reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Operators.complete(Operators.java:137) ~[reactor-core-3.4.10.jar:3.4.10]
    at reactor.netty.FutureMono.doSubscribe(FutureMono.java:122) ~[reactor-netty-core-1.0.11.jar:1.0.11]
    at reactor.netty.FutureMono$DeferredFutureMono.subscribe(FutureMono.java:114) ~[reactor-netty-core-1.0.11.jar:1.0.11]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.10.jar:3.4.10]
    at org.springframework.cloud.sleuth.instrument.reactor.SleuthMonoLift.subscribe(ReactorHooksHelper.java:225) ~[spring-cloud-sleuth-instrumentation-3.0.2-SNAPSHOT.jar:3.0.2-SNAPSHOT]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4361) ~[reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Mono.subscribeWith(Mono.java:4476) ~[reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4332) ~[reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4268) ~[reactor-core-3.4.10.jar:3.4.10]
    at io.rsocket.transport.netty.WebsocketDuplexConnection.sendErrorAndClose(WebsocketDuplexConnection.java:88) ~[rsocket-transport-netty-1.1.1.jar:?]
    at io.rsocket.core.SetupHandlingDuplexConnection.sendErrorAndClose(SetupHandlingDuplexConnection.java:163) ~[rsocket-core-1.1.1.jar:?]
    at io.rsocket.core.ServerSetup.sendError(ServerSetup.java:55) ~[rsocket-core-1.1.1.jar:?]
    at io.rsocket.core.RSocketServer.lambda$null$1(RSocketServer.java:441) ~[rsocket-core-1.1.1.jar:?]
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:250) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1863) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:270) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2063) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:270) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2063) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:270) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2063) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:270) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onError(FluxContextWrite.java:121) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:270) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onError(Operators.java:2063) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Operators.error(Operators.java:198) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoError.subscribe(MonoError.java:53) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxFilter$FilterSubscriber.onComplete(FluxFilter.java:166) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoNext$NextSubscriber.onComplete(MonoNext.java:102) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:83) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:282) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:861) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onComplete(FluxDefaultIfEmpty.java:109) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxFilter$FilterSubscriber.onComplete(FluxFilter.java:166) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:150) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onComplete(FluxFilterFuseable.java:171) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1817) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:113) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:186) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:110) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxFilter$FilterSubscriber.onSubscribe(FluxFilter.java:85) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoCurrentContext.subscribe(MonoCurrentContext.java:36) [reactor-core-3.4.10.jar:3.4.10]
    at org.springframework.cloud.sleuth.instrument.reactor.SleuthMonoLift.subscribe(ReactorHooksHelper.java:225) [spring-cloud-sleuth-instrumentation-3.0.2-SNAPSHOT.jar:3.0.2-SNAPSHOT]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:118) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:191) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:110) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onSubscribe(FluxFilterFuseable.java:87) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:449) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:219) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) [reactor-core-3.4.10.jar:3.4.10]
    at org.springframework.cloud.sleuth.instrument.reactor.SleuthMonoLift.subscribe(ReactorHooksHelper.java:225) [spring-cloud-sleuth-instrumentation-3.0.2-SNAPSHOT.jar:3.0.2-SNAPSHOT]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) [reactor-core-3.4.10.jar:3.4.10]
    at org.springframework.cloud.sleuth.instrument.reactor.SleuthMonoLift.subscribe(ReactorHooksHelper.java:225) [spring-cloud-sleuth-instrumentation-3.0.2-SNAPSHOT.jar:3.0.2-SNAPSHOT]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onComplete(MonoFlatMap.java:181) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxFilter$FilterSubscriber.onComplete(FluxFilter.java:166) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:186) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:110) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxFilter$FilterSubscriber.onSubscribe(FluxFilter.java:85) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoCurrentContext.subscribe(MonoCurrentContext.java:36) [reactor-core-3.4.10.jar:3.4.10]
    at org.springframework.cloud.sleuth.instrument.reactor.SleuthMonoLift.subscribe(ReactorHooksHelper.java:225) [spring-cloud-sleuth-instrumentation-3.0.2-SNAPSHOT.jar:3.0.2-SNAPSHOT]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) [reactor-core-3.4.10.jar:3.4.10]
    at org.springframework.cloud.sleuth.instrument.reactor.SleuthMonoLift.subscribe(ReactorHooksHelper.java:225) [spring-cloud-sleuth-instrumentation-3.0.2-SNAPSHOT.jar:3.0.2-SNAPSHOT]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Operators.complete(Operators.java:137) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:195) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) [reactor-core-3.4.10.jar:3.4.10]
    at org.springframework.cloud.sleuth.instrument.reactor.SleuthMonoLift.subscribe(ReactorHooksHelper.java:225) [spring-cloud-sleuth-instrumentation-3.0.2-SNAPSHOT.jar:3.0.2-SNAPSHOT]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:82) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:150) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:150) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Operators.complete(Operators.java:137) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:195) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) [reactor-core-3.4.10.jar:3.4.10]
    at org.springframework.cloud.sleuth.instrument.reactor.SleuthMonoLift.subscribe(ReactorHooksHelper.java:225) [spring-cloud-sleuth-instrumentation-3.0.2-SNAPSHOT.jar:3.0.2-SNAPSHOT]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4361) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoIgnorePublisher.subscribe(MonoIgnorePublisher.java:57) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.FluxFirstWithSignal$FirstEmittingSubscriber.onNext(FluxFirstWithSignal.java:330) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:160) [reactor-core-3.4.10.jar:3.4.10]
    at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:114) [rsocket-core-1.1.1.jar:?]
    at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) [rsocket-core-1.1.1.jar:?]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) [reactor-core-3.4.10.jar:3.4.10]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279) [reactor-netty-core-1.0.11.jar:1.0.11]
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388) [reactor-netty-core-1.0.11.jar:1.0.11]
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404) [reactor-netty-core-1.0.11.jar:1.0.11]
    at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:579) [reactor-netty-http-1.0.11.jar:1.0.11]
    at reactor.netty.http.server.WebsocketServerOperations.onInboundNext(WebsocketServerOperations.java:167) [reactor-netty-http-1.0.11.jar:1.0.11]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) [reactor-netty-core-1.0.11.jar:1.0.11]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
    at io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler.channelRead(BaseWebsocketServerTransport.java:63) [rsocket-transport-netty-1.1.1.jar:?]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [netty-codec-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [netty-codec-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [netty-transport-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) [netty-common-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.68.Final.jar:4.1.68.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.68.Final.jar:4.1.68.Final]
    at java.lang.Thread.run(Thread.java:831) [?:?]
4

1 回答 1

0

当我错误配置我的安全配置时,我遇到了完全相同的异常。

产生相同异常的配置如下所示:

    return rSocketSecurity.authorizePayload { authorize: AuthorizePayloadsSpec ->
        authorize
            .anyRequest().authenticated()
            .anyExchange().permitAll()
    }.authenticationManager(jwtReactiveAuthenticationManager)
        .build()

正确的安全配置是:

    return rSocketSecurity.authorizePayload { authorize: AuthorizePayloadsSpec ->
        authorize
            .anyRequest().authenticated()
            .anyExchange().permitAll()
    }.jwt { jwtSpec: RSocketSecurity.JwtSpec ->
        try {
            jwtSpec.authenticationManager(jwtReactiveAuthenticationManager)
        } catch (e: Exception) {
            throw RuntimeException(e)
        }
    }.build()
于 2021-11-25T08:49:20.410 回答