我在 Spring Boot (2.5.3) 中使用 rsocket 启动器。我有一个请求者和响应者设置并且它工作得很好(对于所有类型的 rsocket 通信)。问题是当我试图在请求者端实现可恢复性选项时。
这是请求者的代码
@Bean
public RSocketRequester rSocketRequester(RSocketRequester.Builder builder) {
Resume resume = new Resume()
.sessionDuration(Duration.ofMinutes(15))
.streamTimeout(Duration.ofMinutes(15))
.retry(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(5))
.doBeforeRetry(s -> System.out.println("Resume: Before Retry : "+s))
.doAfterRetry(s -> System.out.println("Resume: After Retry : "+s))
);
return builder
.dataMimeType(MimeType.valueOf("application/json"))
.rsocketConnector(rSocketConnector -> {
rSocketConnector.reconnect(Retry.fixedDelay(10, Duration.ofSeconds(20)).doBeforeRetry(s -> {
System.out.println("Reconnect : Disconnected. Trying to reconnect..."+s);
}));
rSocketConnector.resume(resume);
})
.websocket(URI.create("ws://localhost:9190/api/workflow/realtime"));
}
在响应者方面:
@Bean
RSocketServerCustomizer rSocketResume() {
Resume resume = new Resume()
.sessionDuration(Duration.ofMinutes(15))
.streamTimeout(Duration.ofMinutes(15))
.retry(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(5))
.doBeforeRetry(s -> System.out.println("Resume: Before Retry : "+s))
.doAfterRetry(s -> System.out.println("Resume: After Retry : "+s))
);
return rSocketServer -> rSocketServer.resume(resume );
}
但是当响应者宕机并活着回来时,我在请求者控制台上得到以下错误:
**[----> Responder dead at this point]**
Resume: Before Retry : attempt #1 (1 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:9190}
Resume: After Retry : attempt #1 (1 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:9190}
Resume: Before Retry : attempt #2 (2 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:9190}
Resume: After Retry : attempt #2 (2 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:9190}
Resume: Before Retry : attempt #3 (3 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:9190}
Resume: After Retry : attempt #3 (3 in a row), last failure={io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:9190}
**[<---- Responder back to live at this point]**
2021-08-08 04:48:39.404 ERROR 24748 --- [actor-tcp-nio-5] reactor.core.publisher.Operators : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: RejectedResumeException (0x4): unknown resume token
Caused by: io.rsocket.exceptions.RejectedResumeException: unknown resume token
at io.rsocket.exceptions.Exceptions.from(Exceptions.java:64) ~[rsocket-core-1.1.1.jar:na]
at io.rsocket.resume.ClientRSocketSession.tryReestablishSession(ClientRSocketSession.java:271) ~[rsocket-core-1.1.1.jar:na]
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196) ~[reactor-core-3.4.8.jar:3.4.8]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.8.jar:3.4.8]
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249) ~[reactor-core-3.4.8.jar:3.4.8]
at reactor.core.publisher.FluxFirstWithSignal$FirstEmittingSubscriber.onNext(FluxFirstWithSignal.java:330) ~[reactor-core-3.4.8.jar:3.4.8]
at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:160) ~[reactor-core-3.4.8.jar:3.4.8]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:114) ~[rsocket-core-1.1.1.jar:na]
at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.1.jar:na]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.8.jar:3.4.8]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279) ~[reactor-netty-core-1.0.9.jar:1.0.9]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388) ~[reactor-netty-core-1.0.9.jar:1.0.9]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404) ~[reactor-netty-core-1.0.9.jar:1.0.9]
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:706) ~[reactor-netty-http-1.0.9.jar:1.0.9]
at reactor.netty.http.client.WebsocketClientOperations.onInboundNext(WebsocketClientOperations.java:159) ~[reactor-netty-http-1.0.9.jar:1.0.9]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) ~[reactor-netty-core-1.0.9.jar:1.0.9]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.66.Final.jar:4.1.66.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.66.Final.jar:4.1.66.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.66.Final.jar:4.1.66.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.66.Final.jar:4.1.66.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.66.Final.jar:4.1.66.Final]
at java.lang.Thread.run(Unknown Source) ~[na:1.8.0_261]
2021-08-08 04:48:39.407 ERROR 24748 --- [actor-tcp-nio-5] reactor.Flux.Map.1 : onError(RejectedResumeException (0x4): unknown resume token)
谁能建议为什么这种行为?另外我想了解简历上的这个标记是什么?
谢谢。