2

免责声明 - 我不是 Java 程序员。很可能我需要根据给出的任何建议做功课,但我很乐意这样做:)

也就是说,我编写了一个完整的数据库支持的套接字服务器,它在我的小型测试中工作得很好,现在我正在为初始版本做准备。因为我不太了解 Java/Netty/BoneCP——我不知道我是否在某个地方犯了一个根本性的错误,甚至会在我的服务器出现之前就对其造成伤害。

例如,我不知道 executor group 究竟做了什么以及我应该使用什么数字。是否可以将 BoneCP 实现为单例,是否真的有必要为每个数据库查询提供所有这些 try/catch?等等

我试图将我的整个服务器简化为一个与真实事物相同的基本示例(我在文本中删除了所有内容,没有在 java 本身中进行测试,因此请原谅任何语法错误)。

基本思想是客户端可以连接,与服务器交换消息,断开其他客户端,并无限期地保持连接,直到他们选择或被迫断开连接。(客户端将每分钟发送一次 ping 消息以保持连接处于活动状态)

除了未测试此示例之外,唯一的主要区别是 clientID 的设置方式(安全地假设每个连接的客户端确实是唯一的)以及在检查值等方面有更多的业务逻辑。

底线 -可以做任何事情来改进这一点,以便它可以处理尽可能多的并发用户吗?谢谢!


//MAIN
public class MainServer {
    public static void main(String[] args) {
        EdgeController edgeController = new EdgeController();
        edgeController.connect();
    }
}


//EdgeController
public class EdgeController {

    public void connect() throws Exception {
        ServerBootstrap b = new ServerBootstrap();
        ChannelFuture f;


        try {
            b.group(new NioEventLoopGroup(), new NioEventLoopGroup())
                    .channel(NioServerSocketChannel.class)
                    .localAddress(9100)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new EdgeInitializer(new DefaultEventExecutorGroup(10)));


            // Start the server.
            f = b.bind().sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();

        } finally { //Not quite sure how to get here yet... but no matter
            // Shut down all event loops to terminate all threads.
            b.shutdown();

        }
    }
}

//EdgeInitializer
public class EdgeInitializer  extends ChannelInitializer<SocketChannel> {
    private EventExecutorGroup executorGroup;

    public EdgeInitializer(EventExecutorGroup _executorGroup) {
        executorGroup = _executorGroup;
    }

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        pipeline.addLast("idleStateHandler", new IdleStateHandler(200,0,0));
        pipeline.addLast("idleStateEventHandler", new EdgeIdleHandler());
        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.nulDelimiter()));
        pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(this.executorGroup, "handler", new EdgeHandler());
    }    
}

//EdgeIdleHandler
public class EdgeIdleHandler extends ChannelHandlerAdapter {
    private static final Logger logger = Logger.getLogger( EdgeIdleHandler.class.getName());


    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
        if(evt instanceof IdleStateEvent) {
            ctx.close();
        }
    }

     private void trace(String msg) {
        logger.log(Level.INFO, msg);
    }

}

//DBController
public enum DBController {
    INSTANCE;

    private BoneCP connectionPool = null;
    private BoneCPConfig connectionPoolConfig = null;

    public boolean setupPool() {
        boolean ret = true;

        try {
            Class.forName("com.mysql.jdbc.Driver");

            connectionPoolConfig = new BoneCPConfig();
            connectionPoolConfig.setJdbcUrl("jdbc:mysql://" + DB_HOST + ":" + DB_PORT + "/" + DB_NAME);
            connectionPoolConfig.setUsername(DB_USER);
            connectionPoolConfig.setPassword(DB_PASS);

            try {
                connectionPool = new BoneCP(connectionPoolConfig);
            } catch(SQLException ex) {
                ret = false;
            }

        } catch(ClassNotFoundException ex) {
            ret = false;
        }

        return(ret);
    }

    public Connection getConnection() {
        Connection ret;

        try {
            ret = connectionPool.getConnection();
        } catch(SQLException ex) {
            ret = null;
        }

        return(ret);
    }
}

//EdgeHandler
public class EdgeHandler extends ChannelInboundMessageHandlerAdapter<String> {

    private final Charset CHARSET_UTF8 = Charset.forName("UTF-8");
    private long clientID;
    static final ChannelGroup channels = new DefaultChannelGroup();

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Connection dbConnection = null;
        Statement statement = null;
        ResultSet resultSet = null;
        String query;
        Boolean okToPlay = false;


        //Check if status for ID #1 is true
        try {
            query = "SELECT `Status` FROM `ServerTable` WHERE `ID` = 1";

            dbConnection = DBController.INSTANCE.getConnection();
            statement = dbConnection.createStatement();
            resultSet = statement.executeQuery(query);

            if (resultSet.first()) {
                if (resultSet.getInt("Status") > 0) {
                    okToPlay = true;
                }
            }
        } catch (SQLException ex) {
            okToPlay = false;
        } finally {
            if (resultSet != null) {
                try {
                    resultSet.close();
                } catch (SQLException logOrIgnore) {
                }
            }
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException logOrIgnore) {
                }
            }
            if (dbConnection != null) {
                try {
                    dbConnection.close();
                } catch (SQLException logOrIgnore) {
                }
            }
        }

        if (okToPlay) {
            //clientID = setClientID();
            sendCommand(ctx, "HELLO", "WORLD");
        } else {
            sendErrorAndClose(ctx, "CLOSED");
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        channels.remove(ctx.channel());
    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, String request) throws Exception {
        // Generate and write a response.
        String[] segments_whitespace;
        String command, command_args;

        if (request.length() > 0) {

            segments_whitespace = request.split("\\s+");
            if (segments_whitespace.length > 1) {
                command = segments_whitespace[0];
                command_args = segments_whitespace[1];

                if (command.length() > 0 && command_args.length() > 0) {
                    switch (command) {
                        case "HOWDY":  processHowdy(ctx, command_args); break;
                        default:    break;
                    }
                }
            }
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        TraceUtils.severe("Unexpected exception from downstream - " + cause.toString());
        ctx.close();
    }

    /*                                      */
    /*       STATES  - / CLIENT SETUP       */
    /*                                      */
    private void processHowdy(ChannelHandlerContext ctx, String howdyTo) {
        Connection dbConnection = null;
        Statement statement = null;
        ResultSet resultSet = null;
        String replyBack = null;

        try {
            dbConnection = DBController.INSTANCE.getConnection();
            statement = dbConnection.createStatement();
            resultSet = statement.executeQuery("SELECT `to` FROM `ServerTable` WHERE `To`='" + howdyTo + "'");

            if (resultSet.first()) {
                replyBack = "you!";
            }
        } catch (SQLException ex) {
        } finally {
            if (resultSet != null) {
                try {
                    resultSet.close();
                } catch (SQLException logOrIgnore) {
                }
            }
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException logOrIgnore) {
                }
            }
            if (dbConnection != null) {
                try {
                    dbConnection.close();
                } catch (SQLException logOrIgnore) {
                }
            }
        }

        if (replyBack != null) {
            sendCommand(ctx, "HOWDY", replyBack);
        } else {
            sendErrorAndClose(ctx, "ERROR");
        }
    }

    private boolean closePeer(ChannelHandlerContext ctx, long peerClientID) {
        boolean success = false;
        ChannelFuture future;

        for (Channel c : channels) {
            if (c != ctx.channel()) {
                if (c.pipeline().get(EdgeHandler.class).receiveClose(c, peerClientID)) {
                    success = true;
                    break;
                }
            }
        }

        return (success);

    }

    public boolean receiveClose(Channel thisChannel, long remoteClientID) {
        ChannelFuture future;
        boolean didclose = false;
        long thisClientID = (clientID == null ? 0 : clientID);

        if (remoteClientID == thisClientID) {
            future = thisChannel.write("CLOSED BY PEER" + '\n');
            future.addListener(ChannelFutureListener.CLOSE);

            didclose = true;
        }

        return (didclose);
    }


    private ChannelFuture sendCommand(ChannelHandlerContext ctx, String cmd, String outgoingCommandArgs) {
        return (ctx.write(cmd + " " + outgoingCommandArgs + '\n'));
    }

    private ChannelFuture sendErrorAndClose(ChannelHandlerContext ctx, String error_args) {

        ChannelFuture future = sendCommand(ctx, "ERROR", error_args);

        future.addListener(ChannelFutureListener.CLOSE);

        return (future);
    }
}
4

1 回答 1

2

当网络消息到达服务器时,它将被解码并释放一个 messageReceived 事件。

如果您查看您的管道,最后添加到管道的内容是执行程序。因为那个 executor 将接收已解码的内容并将释放 messageReceived 事件。

执行者是事件的处理器,服务器会告诉通过它们发生了哪些事件。所以如何使用执行器是一个重要的主题。如果只有一个执行器,并且因此,所有客户端都使用同一执行器,那么将有一个队列用于使用同一执行器。

当执行者较多时,事件的处理时间会减少,因为不会有任何等待空闲的执行者。

在您的代码中

新的 DefaultEventExecutorGroup(10)

意味着此 ServerBootstrap 在其整个生命周期中将仅使用 10 个执行程序。

在初始化新通道时,使用相同的执行器组:

pipeline.addLast(this.executorGroup, "handler", new EdgeHandler());

因此,每个新的客户端通道都将使用相同的执行程序组(10 个执行程序线程)。

如果 10 个线程能够正确处理传入的事件,那就足够了。但是,如果我们可以看到消息正在被解码/编码但没有快速作为事件处理,这意味着需要增加它们的数量。

我们可以像这样将执行者的数量从 10 增加到 100:

新的 DefaultEventExecutorGroup(100)

因此,如果有足够的 CPU 能力,这将更快地处理事件队列。

不应该为每个新频道创建新的执行者:

pipeline.addLast(new DefaultEventExecutorGroup(10), "handler", new EdgeHandler());

上一行是为每个新通道创建一个新的执行器组,这会大大减慢速度,例如如果有 3000 个客户端,则将有 3000 个执行器组(线程)。这消除了 NIO 的主要优势,即使用低线程量的能力。

我们可以在启动时创建 3000 个执行器,而不是为每个通道创建 1 个执行器,并且至少不会在每次客户端连接/断开连接时删除和创建它们。

.childHandler(new EdgeInitializer(new DefaultEventExecutorGroup(3000)));

上一行比为每个客户端创建 1 个执行器更可接受,因为所有客户端都连接到同一个 ExecutorGroup,并且当客户端断开连接时,即使删除了客户端数据,执行器仍然存在。

如果我们必须谈论数据库请求,一些数据库查询可能需要很长时间才能完成,所以如果有 10 个执行器并且有 10 个作业正在处理,第 11 个作业将不得不等到其他一个完成。如果服务器同时接收超过 10 个非常耗时的数据库作业,则这是一个瓶颈。增加执行者的数量将在一定程度上解决瓶颈。

于 2013-08-23T13:50:27.430 回答