1

我在 Spring Boot (2.5.3) 中使用 rsocket 启动器。我有一个请求者和响应者设置并且它工作得很好(对于所有类型的 rsocket 通信)。问题是当我试图在请求者端实现可恢复性选项时。

这是请求者的代码

@Bean
    public RSocketRequester rSocketRequester(RSocketRequester.Builder builder) {
        Resume resume = new Resume()
                .sessionDuration(Duration.ofMinutes(15))
                .streamTimeout(Duration.ofMinutes(15))
                .retry(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(5))
                        .doBeforeRetry(s -> System.out.println("Resume: Before Retry : "+s))
                        .doAfterRetry(s -> System.out.println("Resume: After Retry : "+s))
                );

        return builder
                .dataMimeType(MimeType.valueOf("application/json"))
                .rsocketConnector(rSocketConnector -> {
                    rSocketConnector.reconnect(Retry.fixedDelay(10, Duration.ofSeconds(20)).doBeforeRetry(s -> {
                        System.out.println("Reconnect : Disconnected. Trying to reconnect..."+s);
                    }));
                    rSocketConnector.resume(resume);
                })
                .websocket(URI.create("ws://localhost:9190/api/workflow/realtime"));
    }

在响应者方面:

@Bean
    RSocketServerCustomizer rSocketResume() {
        Resume resume = new Resume()
                .sessionDuration(Duration.ofMinutes(15))
                .streamTimeout(Duration.ofMinutes(15))
                .retry(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(5))
                        .doBeforeRetry(s -> System.out.println("Resume: Before Retry : "+s))
                        .doAfterRetry(s -> System.out.println("Resume: After Retry : "+s))
                );
        return rSocketServer -> rSocketServer.resume(resume );
    }

但是当响应者宕机并活着回来时,我在请求者控制台上得到以下错误:

**[----> Responder dead at this point]**
Resume: Before Retry : attempt #1 (1 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:9190}
Resume: After Retry : attempt #1 (1 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:9190}
Resume: Before Retry : attempt #2 (2 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:9190}
Resume: After Retry : attempt #2 (2 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:9190}
Resume: Before Retry : attempt #3 (3 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:9190}
Resume: After Retry : attempt #3 (3 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:9190}

**[<---- Responder back to live at this point]**

2021-08-08 04:48:39.404 ERROR 24748 --- [actor-tcp-nio-5] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: RejectedResumeException (0x4): unknown resume token
Caused by: io.rsocket.exceptions.RejectedResumeException: unknown resume token
    at io.rsocket.exceptions.Exceptions.from(Exceptions.java:64) ~[rsocket-core-1.1.1.jar:na]
    at io.rsocket.resume.ClientRSocketSession.tryReestablishSession(ClientRSocketSession.java:271) ~[rsocket-core-1.1.1.jar:na]
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196) ~[reactor-core-3.4.8.jar:3.4.8]
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.8.jar:3.4.8]
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249) ~[reactor-core-3.4.8.jar:3.4.8]
    at reactor.core.publisher.FluxFirstWithSignal$FirstEmittingSubscriber.onNext(FluxFirstWithSignal.java:330) ~[reactor-core-3.4.8.jar:3.4.8]
    at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:160) ~[reactor-core-3.4.8.jar:3.4.8]
    at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:114) ~[rsocket-core-1.1.1.jar:na]
    at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.1.jar:na]
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.8.jar:3.4.8]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279) ~[reactor-netty-core-1.0.9.jar:1.0.9]
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388) ~[reactor-netty-core-1.0.9.jar:1.0.9]
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404) ~[reactor-netty-core-1.0.9.jar:1.0.9]
    at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:706) ~[reactor-netty-http-1.0.9.jar:1.0.9]
    at reactor.netty.http.client.WebsocketClientOperations.onInboundNext(WebsocketClientOperations.java:159) ~[reactor-netty-http-1.0.9.jar:1.0.9]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) ~[reactor-netty-core-1.0.9.jar:1.0.9]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.66.Final.jar:4.1.66.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.66.Final.jar:4.1.66.Final]
    at java.lang.Thread.run(Unknown Source) ~[na:1.8.0_261]

2021-08-08 04:48:39.407 ERROR 24748 --- [actor-tcp-nio-5] reactor.Flux.Map.1                       : onError(RejectedResumeException (0x4): unknown resume token)

谁能建议为什么这种行为?另外我想了解简历上的这个标记是什么?

谢谢。

4

0 回答 0