9

简单场景:

  1. 扩展 SimpleChannelUpstreamHandler 的较低级别的类 A。这个类是发送消息和接收响应的主力。
  2. 可以被系统的其他部分用来发送和接收消息的顶级B类(可以模拟同步和异步)。此类创建 ClientBootstrap,设置管道工厂,调用 bootstrap.connect() 并最终获得用于发送和接收消息的类 A 的句柄/引用。就像是:

    ChannelFuture future = bootstrap.connect();
    Channel channel = future.awaitUninterruptibly().getChannel();
    

    处理程序 = channel.getPipeline().get(A.class);

我知道在 A 类中,我可以覆盖 public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e); 这样当远程服务器关闭时,我可以得到通知。

由于通道关闭后,B类中的原始A类引用(上面的处理程序)不再有效,所以我需要用新的引用替换它。

理想情况下,我希望 A 类具有在上述覆盖的 channelClosed 方法中通知 B 类的机制,以便可以在 B 类中再次调用 bootstrap.connect。一种方法是在 A 类中引用 B 类. 为此,我需要将 B 类引用传递给 PipelineFactory,然后让 PipelineFactory 将 B 的引用传递给 A。

还有其他更简单的方法来实现相同的目标吗?

谢谢,

4

3 回答 3

19

Channel.closeFuture()返回一个ChannelFuture将在通道关闭时通知您。您可以将 a 添加ChannelFutureListener到 B 中的未来,以便您可以在那里进行另一次连接尝试。

您可能希望重复此操作,直到连接尝试最终成功:

private void doConnect() {
    Bootstrap b = ...;
    b.connect().addListener((ChannelFuture f) -> {
        if (!f.isSuccess()) {
            long nextRetryDelay = nextRetryDelay(...);
            f.channel().eventLoop().schedule(nextRetryDelay, ..., () -> {
                doConnect();
            }); // or you can give up at some point by just doing nothing.
        }
    });
}
于 2013-11-04T08:25:46.570 回答
1

我不知道这是否是正确的解决方案,但为了修复 trustin 解决方案的线程泄漏,我发现我可以在调度程序触发后关闭事件循环:

final EventLoop eventloop = f.channel().eventLoop();
b.connect().addListener((ChannelFuture f) -> {
    if (!f.isSuccess()) {
        long nextRetryDelay = nextRetryDelay(...);
        eventloop.schedule(() -> {
            doConnect();
            eventloop.shutdownGracefully();
        }, nextRetryDelay, ...);
    }
});
于 2021-01-21T16:20:07.720 回答
0

这是另一个将重新连接行为封装在一个小助手类中的版本

Bootstrap clientBootstrap...
EventLoopGroup group = new NioEventLoopGroup();

Session session = new Session(clientBootstrap,group);
Disposable shutdownHook = session.start();    

interface Disposable {
   void dispose();
}
class Session implements Disposable{    
    private final EventLoopGroup scheduler;
    private final Bootstrap clientBootstrap;

    private int reconnectDelayMs;
    private Channel activeChannel;
    private AtomicBoolean shouldReconnect;

    private Session(Bootstrap clientBootstrap, EventLoopGroup scheduler) {
        this.scheduler = scheduler;
        this.clientBootstrap = clientBootstrap;
        this.reconnectDelayMs = 1;
        this.shouldReconnect = new AtomicBoolean(true);
    }

    public Disposable start(){
        //Create a new connectFuture
        ChannelFuture connectFuture = clientBootstrap.connect();

        connectFuture.addListeners( (ChannelFuture cf)->{
            if(cf.isSuccess()){
                L.info("Connection established");
                reconnectDelayMs =1;                    
                activeChannel = cf.channel();

                //Listen to the channel closing
                var closeFuture =activeChannel.closeFuture();
                closeFuture.addListeners( (ChannelFuture closeFut)->{
                    if(shouldReconnect.get()) {
                        activeChannel.eventLoop().schedule(this::start, nextReconnectDelay(), TimeUnit.MILLISECONDS);
                    }
                    else{
                        L.info("Session has been disposed won't reconnect");
                    }
                });
            }
            else{
                int delay =nextReconnectDelay();
                L.info("Connection failed will re-attempt in {} ms",delay);
                cf.channel().eventLoop().schedule(this::start,delay , TimeUnit.MILLISECONDS);
            }
        });
        
        return this;
    }

    /**
     * Call this to end the session
     */
    @Override
    public void dispose() {
        try {
            shouldReconnect.set(false);
            scheduler.shutdownGracefully().sync();
            if(activeChannel !=null) {
                activeChannel.closeFuture().sync();
            }
        }catch(InterruptedException e){
            L.warn("Interrupted while shutting down TcpClient");
        }
    }

    private int nextReconnectDelay(){
        this.reconnectDelayMs = this.reconnectDelayMs*2;
        return Math.min(this.reconnectDelayMs, 5000);
    }
}
于 2020-11-02T18:04:17.657 回答