0

我使用 Netty 4 创建了一个相当简单的服务器。我已经能够将其扩展为处理数千个连接,并且它永远不会超过约 40 个线程。

服务器线程数保持不变(通过 Java VisualVM)

为了测试它,我还创建了一个创建数千个连接的测试客户端。不幸的是,这会创建与建立连接一样多的线程。我希望为客户减少线程。为此,我查看了很多帖子。许多示例显示了单连接设置。 说要在客户之间共享 NioEventLoopGroup,我就是这样做的。我得到的 nioEventLoopGroup 数量有限,但在其他地方每个连接都有一个线程。我不是故意在管道中创建线程,也不知道可能是什么。

客户端线程数随连接数增长(通过 Java VisualVM)

线程(通过 Java VisualVM)

这是我的客户端代码设置的一个片段。根据我迄今为止的研究,它似乎应该保持一个固定的线程数。有没有什么我应该做的事情来防止每个客户端连接的线程?

主要的

final EventLoopGroup group = new NioEventLoopGroup();

for (int i=0; i<100; i++)){
    MockClient client = new MockClient(i, group);
    client.connect();
}

模拟客户端

public class MockClient implements Runnable {

    private final EventLoopGroup group;

    private int identity;

    public MockClient(int identity, final EventLoopGroup group) {
        this.identity = identity;
        this.group = group;
    }

    @Override
    public void run() {
        try {
            connect();
        } catch (Exception e) {}
    }

    public void connect() throws Exception{

        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .handler(new MockClientInitializer(identity, this));

        final Runnable that = this;
        // Start the connection attempt
        b.connect(config.getHost(), config.getPort()).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    Channel ch = future.sync().channel();
                } else {
                    //if the server is down, try again in a few seconds
                    future.channel().eventLoop().schedule(that, 15, TimeUnit.SECONDS); 
                }
            }
        });
    }
}
4

1 回答 1

1

正如我之前多次发生的那样,详细解释问题让我思考更多,我遇到了这个问题。如果其他人在创建数千个 Netty 客户端时遇到同样的问题,我想在这里提供它。

我的管道中有一条路径将创建一个超时任务来模拟客户端连接重新启动。事实证明,每当它收到来自服务器的“重新启动”信号(这种情况经常发生)时,这个计时器任务就会为每个连接创建额外的线程,直到每个连接都有一个线程。

处理程序

private final HashedWheelTimer timer;

@Override
protected void channelRead0(ChannelHandlerContext ctx, Packet msg) throws Exception {

    Packet packet = reboot();

    ChannelFutureListener closeHandler = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            RebootTimeoutTask timeoutTask = new RebootTimeoutTask(identity, client);
            timer.newTimeout(timeoutTask, SECONDS_FOR_REBOOT, TimeUnit.SECONDS);
        }
    };

    ctx.writeAndFlush(packet).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                future.channel().close().addListener(closeHandler);
            } else {
                future.channel().close();
            }
        }
    });

}

超时任务

public class RebootTimeoutTask implements TimerTask {

    public RebootTimeoutTask(...) {...}

    @Override
    public void run(Timeout timeout) throws Exception {
        client.connect(identity);
    }

}
于 2014-06-13T00:17:22.877 回答