我正在尝试测试 RSocket 的重新连接功能,但是当服务器在两者之间关闭并重新启动时它不起作用。
第 1 步:在服务器关闭时启动客户端
16:02:35.351 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 55104 (auto-detected)
16:02:35.515 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 10:e7:c6:ff:fe:31:38:c0 (auto-detected)
16:02:35.560 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x7b01559f] Created a new pooled channel, now 1 active connections and 0 inactive connections
16:02:35.569 [reactor-tcp-nio-2] DEBUG reactor.netty.transport.TransportConfig - [id: 0x7b01559f] Initialized pipeline DefaultChannelPipeline{(reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
doAfterRetry ===>io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999
16:02:36.619 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x8a39ea36] Created a new pooled channel, now 1 active connections and 0 inactive connections
16:02:36.620 [reactor-tcp-nio-2] DEBUG reactor.netty.transport.TransportConfig - [id: 0x8a39ea36] Initialized pipeline DefaultChannelPipeline{(reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
doAfterRetry ===>io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999
16:02:37.625 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x9754e5dd] Created a new pooled channel, now 1 active connections and 0 inactive connections
16:02:37.625 [reactor-tcp-nio-2] DEBUG reactor.netty.transport.TransportConfig - [id: 0x9754e5dd] Initialized pipeline DefaultChannelPipeline{(reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
doAfterRetry ===>io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999
第 2 步:服务器已启动。重新连接成功
16:08:31.359 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1] Created a new pooled channel, now 1 active connections and 0 inactive connections
16:08:31.359 [reactor-tcp-nio-2] DEBUG reactor.netty.transport.TransportConfig - [id: 0x67b9f3a1] Initialized pipeline DefaultChannelPipeline{(reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
16:08:31.862 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999] Registering pool release on close event for channel
16:08:31.863 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999] Channel connected, now 1 active connections and 0 inactive connections
16:08:31.863 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999] onStateChange(PooledConnection{channel=[id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999]}, [connected])
16:08:31.865 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999] onStateChange(ChannelOperations{PooledConnection{channel=[id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999]}}, [configured])
16:08:31.891 [reactor-tcp-nio-2] DEBUG io.rsocket.FrameLogger - sending ->
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:
16:08:31.957 [reactor-tcp-nio-2] DEBUG reactor.netty.channel.FluxReceive - [id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999] FluxReceive{pending=0, cancelled=false, inboundDone=false, inboundError=null}: subscribing inbound receiver
16:08:31.969 [reactor-tcp-nio-2] DEBUG io.rsocket.FrameLogger - sending ->
Frame => Stream ID: 1 Type: REQUEST_STREAM Flags: 0b100000000 Length: 57 InitialRequestN: 9223372036854775807
Metadata:
第三步:关闭服务器
16:10:10.993 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 ! R:localhost/127.0.0.1:7999] Channel closed, now 0 active connections and 0 inactive connections
null
16:10:10.999 [reactor-tcp-nio-2] DEBUG org.springframework.core.codec.CharSequenceEncoder - Writing "ClientName:1607847010998"
16:10:11.008 [reactor-tcp-nio-2] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.nio.channels.ClosedChannelException
Caused by: java.nio.channels.ClosedChannelException: null
16:10:11.008 [reactor-tcp-nio-2] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.nio.channels.ClosedChannelException
Caused by: java.nio.channels.ClosedChannelException: null
16:10:11.010 [reactor-tcp-nio-2] DEBUG reactor.netty.ReactorNetty - [id: 0x67b9f3a1, L:/127.0.0.1:57689 ! R:localhost/127.0.0.1:7999] Non Removed handler: RSocketLengthCodec, context: ChannelHandlerContext(RSocketLengthCodec, [id: 0x67b9f3a1, L:/127.0.0.1:57689 ! R:localhost/127.0.0.1:7999]), pipeline: DefaultChannelPipeline{(RSocketLengthCodec = io.rsocket.transport.netty.RSocketLengthCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
16:10:11.010 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 ! R:localhost/127.0.0.1:7999] onStateChange(ChannelOperations{PooledConnection{channel=[id: 0x67b9f3a1, L:/127.0.0.1:57689 ! R:localhost/127.0.0.1:7999]}}, [disconnecting])
在上述步骤之后,RSocket 没有重新连接。下面是我的程序。有人可以帮助审查和建议这有什么问题。
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(e -> e.add(new Jackson2JsonEncoder()))
.decoders(e -> e.add(new Jackson2JsonDecoder()))
.build();
RSocketRequester r = RSocketRequester.builder()
.rsocketConnector(connector ->
connector.reconnect(Retry.indefinitely().doAfterRetry(e-> System.out.println("doAfterRetry ===>"+e.failure())))
).dataMimeType(MediaType.APPLICATION_JSON)
.rsocketStrategies(strategies)
.tcp("localhost", 7999);
12 月 15 日更新
以下代码在步骤 2 之后发送请求。断开连接后,它无法恢复流。我确信我的代码中一定遗漏了一些东西。请帮助
requester.route("route_name")
.data("RequestData")
.retrieveFlux(MyResponse.class)
.doOnError(ex ->{
System.out.println("doOnError"+ex);
}).doOnCancel(()->{
System.out.println("doOnCancel");
}).doOnComplete(()-> {
System.out.println("doOnCancel");
})
.subscribe(result -> {
System.out.println("===>"+result);
});