1

我面临的问题是这样的:

  1. 在 Main 类中,我将尝试连接到服务器并附加 Channel Listener 以供将来操作。
  2. 如果连接建立成功,则 SSL 握手完成没有任何问题。
  3. 但如果第 1 步中的连接失败,我将尝试连接到相同或不同的服务器,并再次附加与点相同的相同通道侦听器。
  4. 但是期望如果建立连接,它应该像第 2 点中一样进行 SSL 握手。但事实并非如此。即使我在 SslHandler 中强行调用重新协商方法。

预期行为

如果使用引导对象连接到服务器时出现任何连接异常,预期应该是 SSL 握手。

实际行为 它在重试和失败时跳过 SSL 握手,预期为 UnknownMessage 类型(ByteBuf)

重现步骤

  1. 而主连接
    public class Main {    
    private final static Logger LOGGER = LoggerFactory.getLogger(Main.class);    
    public static void main(String[] args) {
    
        ClientConfig clientConfig = null;
    
        LOGGER.info("initializing Agent Stats uploader");
        // Set up.
        InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE);
        Bootstrap clientBootstrap = getBootstrap();
    
        clientConfig = ClientConfig.getInstance();
    
    
        InetSocketAddress server = clientConfig.getPrimaryScnInetAddrs();
        Objects.nonNull(server.getHostName());
        Objects.nonNull(server.getPort());
    
        // Make a new connection.
        LOGGER.info("Initialization complete, ready to connect to the host and port  {}:{}", server.getHostName(),
                server.getPort());
    
        ServerChannelFutureListener serverChannelFutureListener = ServerChannelFutureListener.getInstance();
        serverChannelFutureListener.setClientBootStrap(clientBootstrap);
    
        ChannelPromise channelPromise =
                (ChannelPromise) clientBootstrap.connect(server).addListener(serverChannelFutureListener);
    
        EventLoopGroup eventGroupExecutor = clientBootstrap.config().group();
        AgentStatsProcess agentStatsThread = AgentStatsProcess.getInstance();
        agentStatsThread.setParentChannelFuture(channelPromise);
        eventGroupExecutor.scheduleAtFixedRate(agentStatsThread, clientConfig.getInitialDelay(),
                clientConfig.getScheduleInterval(), TimeUnit.SECONDS);
        LOGGER.info("Scheduled Agent Stats uploading, should start in 30 secs");
    
        LOGGER.info("Connection complete");
    
    
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                LOGGER.info("Killing AgentStatUploader Thread");
                eventGroupExecutor.shutdownGracefully();
        }));
    }
    
    public static final Bootstrap getBootstrap() {
    EventLoopGroup group = new NioEventLoopGroup();
    
        Bootstrap b = new Bootstrap();
     b.group(group);
     b.channel(NioSocketChannel.class);
     b.handler(new AgentStatsChannelInitializationHandler());
    
     b.option(ChannelOption.SO_KEEPALIVE, true);
     b.option(ChannelOption.TCP_NODELAY, true);
    
     return b;
    }
    
    }
  1. 在步骤 1 中具有用于实现重试逻辑的 Channel Future 处理程序
    public final class ServerChannelFutureListener implements GenericFutureListener {
    private static final Logger logger = LoggerFactory.getLogger(ServerChannelFutureListener.class.getName());
    private static ServerChannelFutureListener instance;
    
    private AtomicInteger count = new AtomicInteger(1);
    private ClientConfig clientConfig = ClientConfig.getInstance();
    private boolean isPrimary=true;
    private ChannelFuture channelFuture;
    private Bootstrap clientBootStrap;
    private long timeout;
    
    private ServerChannelFutureListener(){
    this.timeout = clientConfig.getRetryAfter();
    
    }
    @override
    public void operationComplete(ChannelFuture future) throws Exception {
    channelFuture = future;
    int maxretries = clientConfig.getMaxRetries();
    if (!future.isSuccess()) {
    logger.info("Connection to {} scn is not successful, retrying ({}/{})", getServerType(), count.get(),maxretries);
    logger.debug("Connection to server is failed with error: ",future.cause());
    if ( count.incrementAndGet() > maxretries) {
    // fails to connect even after max-retries, try to connect to next server.
    logger.info("Failed to connect to {} server, will try to connect to {} now.",
    getServerType(),
    isPrimary() ? "SECONDARY":"PRIMARY");
    
            count.getAndSet(1);
            isPrimary = !isPrimary();
            this.timeout = clientConfig.getRetryAfter();
            logger.info("Connecting Server type changed, so resetting timeout: {}", this.timeout);
    
        }else{
            // retry
            logger.info("Exponential Back-off set to: {} secs, waiting for next server connection", this.timeout);
            //TimeUnit.SECONDS.sleep(this.timeout);
            this.timeout = ExpontentialBackOff.getNextBackOff(this.timeout);
        }
    
        InetSocketAddress server = getServer();
        logger.info("Initialization complete, ready to connect to the host and port  {}:{}", server.getHostName(),
                server.getPort());
        channelFuture = clientBootStrap.connect(server).addListener(this);
     }else {
        logger.info("Using Connection with config: {}, to Server {} ", future.channel().config(),
                future.channel().localAddress());
        this.timeout = clientConfig.getRetryAfter();
        logger.info("Time out Back-off reset to: {} for next server connection", this.timeout);
    
    
     }
     AgentStatsProcess.getInstance().setParentChannelFuture(channelFuture);
    }
    
    private String getServerType() {
    return isPrimary() ? "PRIMARY" : "SECONDARY";
    }
    
    private InetSocketAddress getServer(){
    return isPrimary()?clientConfig.getPrimaryScnInetAddrs():clientConfig.getSecondaryScnInetAddrs();
    }
    
    public static ServerChannelFutureListener getInstance(){
    if(null == instance){
    instance = new ServerChannelFutureListener();
    }
    return instance;
    }
    
    public boolean isPrimary() {
    return isPrimary;
    }
    
    public ChannelFuture getChannelFuture() {
    return channelFuture;
    }
    
    public void setClientBootStrap(Bootstrap cb) {
    this.clientBootStrap = cb;
    }
    }

期望在尝试重新连接但失败后应该发生 SSL 握手。

Netty 版本:4.1.12.Final

4

1 回答 1

0

修复了这个问题,这里的罪魁祸首是“ProtobufVarint32FrameDecoder”,它是父类“ByteToMessageDecoder”。“ByteToMessageDecoder”确保它的子类不可共享。

由于上述类不可共享,因此每次代码尝试使用 boostrap 重新连接时,初始化程序类都无法在管道中添加处理程序导致“ctx.close()”并且没有处理程序。

我已经解决了将这两个类添加到我的项目中并提出#10371错误来解决这个问题。

于 2020-06-24T10:53:35.700 回答