1

我正在尝试测试 RSocket 的重新连接功能,但是当服务器在两者之​​间关闭并重新启动时它不起作用。

第 1 步:在服务器关闭时启动客户端

16:02:35.351 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 55104 (auto-detected)
16:02:35.515 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 10:e7:c6:ff:fe:31:38:c0 (auto-detected)
16:02:35.560 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x7b01559f] Created a new pooled channel, now 1 active connections and 0 inactive connections
16:02:35.569 [reactor-tcp-nio-2] DEBUG reactor.netty.transport.TransportConfig - [id: 0x7b01559f] Initialized pipeline DefaultChannelPipeline{(reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
doAfterRetry ===>io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999
16:02:36.619 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x8a39ea36] Created a new pooled channel, now 1 active connections and 0 inactive connections
16:02:36.620 [reactor-tcp-nio-2] DEBUG reactor.netty.transport.TransportConfig - [id: 0x8a39ea36] Initialized pipeline DefaultChannelPipeline{(reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
doAfterRetry ===>io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999
16:02:37.625 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x9754e5dd] Created a new pooled channel, now 1 active connections and 0 inactive connections
16:02:37.625 [reactor-tcp-nio-2] DEBUG reactor.netty.transport.TransportConfig - [id: 0x9754e5dd] Initialized pipeline DefaultChannelPipeline{(reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
doAfterRetry ===>io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:7999

第 2 步:服务器已启动。重新连接成功

16:08:31.359 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1] Created a new pooled channel, now 1 active connections and 0 inactive connections
16:08:31.359 [reactor-tcp-nio-2] DEBUG reactor.netty.transport.TransportConfig - [id: 0x67b9f3a1] Initialized pipeline DefaultChannelPipeline{(reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
16:08:31.862 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999] Registering pool release on close event for channel
16:08:31.863 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999] Channel connected, now 1 active connections and 0 inactive connections
16:08:31.863 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999] onStateChange(PooledConnection{channel=[id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999]}, [connected])
16:08:31.865 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999] onStateChange(ChannelOperations{PooledConnection{channel=[id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999]}}, [configured])
16:08:31.891 [reactor-tcp-nio-2] DEBUG io.rsocket.FrameLogger - sending -> 
Frame => Stream ID: 0 Type: SETUP Flags: 0b0 Length: 75
Data:

16:08:31.957 [reactor-tcp-nio-2] DEBUG reactor.netty.channel.FluxReceive - [id: 0x67b9f3a1, L:/127.0.0.1:57689 - R:localhost/127.0.0.1:7999] FluxReceive{pending=0, cancelled=false, inboundDone=false, inboundError=null}: subscribing inbound receiver
16:08:31.969 [reactor-tcp-nio-2] DEBUG io.rsocket.FrameLogger - sending -> 
Frame => Stream ID: 1 Type: REQUEST_STREAM Flags: 0b100000000 Length: 57 InitialRequestN: 9223372036854775807
Metadata:

第三步:关闭服务器

16:10:10.993 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 ! R:localhost/127.0.0.1:7999] Channel closed, now 0 active connections and 0 inactive connections
null
16:10:10.999 [reactor-tcp-nio-2] DEBUG org.springframework.core.codec.CharSequenceEncoder - Writing "ClientName:1607847010998"
16:10:11.008 [reactor-tcp-nio-2] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.nio.channels.ClosedChannelException
Caused by: java.nio.channels.ClosedChannelException: null
16:10:11.008 [reactor-tcp-nio-2] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.nio.channels.ClosedChannelException
Caused by: java.nio.channels.ClosedChannelException: null
16:10:11.010 [reactor-tcp-nio-2] DEBUG reactor.netty.ReactorNetty - [id: 0x67b9f3a1, L:/127.0.0.1:57689 ! R:localhost/127.0.0.1:7999] Non Removed handler: RSocketLengthCodec, context: ChannelHandlerContext(RSocketLengthCodec, [id: 0x67b9f3a1, L:/127.0.0.1:57689 ! R:localhost/127.0.0.1:7999]), pipeline: DefaultChannelPipeline{(RSocketLengthCodec = io.rsocket.transport.netty.RSocketLengthCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
16:10:11.010 [reactor-tcp-nio-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id: 0x67b9f3a1, L:/127.0.0.1:57689 ! R:localhost/127.0.0.1:7999] onStateChange(ChannelOperations{PooledConnection{channel=[id: 0x67b9f3a1, L:/127.0.0.1:57689 ! R:localhost/127.0.0.1:7999]}}, [disconnecting])

在上述步骤之后,RSocket 没有重新连接。下面是我的程序。有人可以帮助审查和建议这有什么问题。

        RSocketStrategies strategies = RSocketStrategies.builder()
                .encoders(e -> e.add(new Jackson2JsonEncoder()))
                .decoders(e -> e.add(new Jackson2JsonDecoder()))
                .build();

        RSocketRequester r = RSocketRequester.builder()
                .rsocketConnector(connector ->
                        connector.reconnect(Retry.indefinitely().doAfterRetry(e->  System.out.println("doAfterRetry ===>"+e.failure())))
                ).dataMimeType(MediaType.APPLICATION_JSON)
                .rsocketStrategies(strategies)
                .tcp("localhost", 7999);

12 月 15 日更新

以下代码在步骤 2 之后发送请求。断开连接后,它无法恢复流。我确信我的代码中一定遗漏了一些东西。请帮助

requester.route("route_name")
                .data("RequestData")
                .retrieveFlux(MyResponse.class)
                .doOnError(ex ->{
                    System.out.println("doOnError"+ex);
                }).doOnCancel(()->{
                    System.out.println("doOnCancel");
                }).doOnComplete(()-> {
                    System.out.println("doOnCancel");
                })
                .subscribe(result -> {
                    System.out.println("===>"+result);
                });
4

1 回答 1

0

reconnect 方法有大量的 Javadoc。此功能的主要目的是建立单个共享连接,无论一次可能有多少订阅者:

启用此功能后,此类的连接方法会返回一个特殊的 Mono,该 Mono 维护一个共享的 RSocket

决定在Retry放弃之前尝试连接多长时间,但是一旦停止尝试,或者一旦建立连接然后丢失,它就不会自动尝试再次连接。

单个请求的下游订阅者仍然需要他们自己的重试逻辑来确定是否或何时重试失败的请求,这反过来又会触发共享重新连接

因此,每个单独请求的需求决定是否甚至尝试再次连接,而Retry给定的reconnect决定每个共享重新连接的重试逻辑。

请务必查看 Javadoc 中的相应代码片段。

于 2020-12-15T16:24:51.540 回答