0

我有一个相当简单的测试 Netty 服务器/客户端项目。我正在测试通信稳定性的某些方面,方法是用消息淹没服务器并计算我返回的消息和字节以确保一切匹配。

当我从客户端运行洪水时,客户端会跟踪它发送的消息数量以及返回的消息数量,然后当数量相等时,它会打印出一些统计信息。

在本地运行时的某些情况下(我猜是因为拥塞?)客户端永远不会最终打印出最终消息。当 2 个组件在远程机器上时,我没有遇到这个问题。任何建议,将不胜感激:

Encoder 只是一个简单的 OneToOneEncoder ,它将 Envelope 类型编码为 ChannelBuffer ,而 Decoder 是一个简单的 ReplayDecoder ,它做相反的事情。

我尝试向我的客户端处理程序添加一个 ChannelInterestChanged 方法,以查看通道的兴趣是否已更改为不读取,但似乎并非如此。

相关代码如下:

谢谢!

服务器

    public class Server {

    // configuration --------------------------------------------------------------------------------------------------
    private final int port;
    private ServerChannelFactory serverFactory;
    // constructors ---------------------------------------------------------------------------------------------------

    public Server(int port) {
        this.port = port;
    }


    // public methods -------------------------------------------------------------------------------------------------
    public boolean start() {
        ExecutorService bossThreadPool = Executors.newCachedThreadPool();
        ExecutorService childThreadPool = Executors.newCachedThreadPool();

        this.serverFactory = new NioServerSocketChannelFactory(bossThreadPool, childThreadPool);
        this.channelGroup = new DeviceIdAwareChannelGroup(this + "-channelGroup");
        ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("encoder", Encoder.getInstance());
                pipeline.addLast("decoder", new Decoder());
                pipeline.addLast("handler", new ServerHandler());
                return pipeline;
            }
        };

        ServerBootstrap bootstrap = new ServerBootstrap(this.serverFactory);
        bootstrap.setOption("reuseAddress", true);
        bootstrap.setOption("child.tcpNoDelay", true);
        bootstrap.setOption("child.keepAlive", true);
        bootstrap.setPipelineFactory(pipelineFactory);

        Channel channel = bootstrap.bind(new InetSocketAddress(this.port));
        if (!channel.isBound()) {
            this.stop();
            return false;
        }

        this.channelGroup.add(channel);
        return true;
    }

    public void stop() {
        if (this.channelGroup != null) {
            ChannelGroupFuture channelGroupCloseFuture = this.channelGroup.close();
            System.out.println("waiting for ChannelGroup shutdown...");
            channelGroupCloseFuture.awaitUninterruptibly();
        }
        if (this.serverFactory != null) {
            this.serverFactory.releaseExternalResources();
        }
    }

    // main -----------------------------------------------------------------------------------------------------------
    public static void main(String[] args) {
        int port;
        if (args.length != 3) {
            System.out.println("No arguments found using default values");
            port = 9999;
        } else {
            port = Integer.parseInt(args[1]);
        }

        final Server server = new Server( port);

        if (!server.start()) {
            System.exit(-1);
        }
        System.out.println("Server started on port 9999 ... ");
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                server.stop();
            }
        });
    }
}

服务器处理程序

 public class ServerHandler extends SimpleChannelUpstreamHandler {

    // internal vars --------------------------------------------------------------------------------------------------

    private AtomicInteger numMessagesReceived=new AtomicInteger(0);

    // constructors ---------------------------------------------------------------------------------------------------
    public ServerHandler() {
    }

    // SimpleChannelUpstreamHandler -----------------------------------------------------------------------------------
    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        Channel c = e.getChannel();
        System.out.println("ChannelConnected: channel id: " + c.getId() + ", remote host: " + c.getRemoteAddress() + ", isChannelConnected(): " + c.isConnected());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        System.out.println("*** EXCEPTION CAUGHT!!! ***");
        e.getChannel().close();
    }

    @Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        super.channelDisconnected(ctx, e);
        System.out.println("*** CHANNEL DISCONNECTED ***");

    }

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        if(numMessagesReceived.incrementAndGet()%1000==0 ){
             System.out.println("["+numMessagesReceived+"-TH MSG]: Received message: " + e.getMessage());
        }

        if (e.getMessage() instanceof Envelope) {
                // echo it...
                if (e.getChannel().isWritable()) {
                    e.getChannel().write(e.getMessage());
                }
        } else {
            super.messageReceived(ctx, e);
        }
    }
}

客户

public class Client implements ClientHandlerListener {

    // configuration --------------------------------------------------------------------------------------------------
    private final String host;
    private final int port;
    private final int messages;
    // internal vars --------------------------------------------------------------------------------------------------
    private ChannelFactory clientFactory;
    private ChannelGroup channelGroup;
    private ClientHandler handler;
    private final AtomicInteger received;
    private long startTime;
    private ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

    // constructors ---------------------------------------------------------------------------------------------------
    public Client(String host, int port, int messages) {
        this.host = host;
        this.port = port;
        this.messages = messages;
        this.received = new AtomicInteger(0);
    }

    // ClientHandlerListener ------------------------------------------------------------------------------------------
    @Override
    public void messageReceived(Envelope message) {
        if (this.received.incrementAndGet() == this.messages) {
            long stopTime = System.currentTimeMillis();
            float timeInSeconds = (stopTime - this.startTime) / 1000f;
            System.err.println("Sent and received " + this.messages + " in " + timeInSeconds + "s");
            System.err.println("That's " + (this.messages / timeInSeconds) + " echoes per second!");
        }
    }

    // public methods -------------------------------------------------------------------------------------------------
    public boolean start() {

        // For production scenarios, use limited sized thread pools
        this.clientFactory = new NioClientSocketChannelFactory(cachedThreadPool, cachedThreadPool);
        this.channelGroup = new DefaultChannelGroup(this + "-channelGroup");
        this.handler = new ClientHandler(this, this.channelGroup);
        ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("byteCounter", new ByteCounter("clientByteCounter"));
                pipeline.addLast("encoder", Encoder.getInstance());
                pipeline.addLast("decoder", new Decoder());
                pipeline.addLast("handler", handler);
                return pipeline;
            }
        };

        ClientBootstrap bootstrap = new ClientBootstrap(this.clientFactory);
        bootstrap.setOption("reuseAddress", true);
        bootstrap.setOption("tcpNoDelay", true);
        bootstrap.setOption("keepAlive", true);
        bootstrap.setPipelineFactory(pipelineFactory);

        boolean connected = bootstrap.connect(new InetSocketAddress(host, port)).awaitUninterruptibly().isSuccess();
        System.out.println("isConnected: " + connected);
        if (!connected) {
            this.stop();
        }

        return connected;
    }

    public void stop() {
        if (this.channelGroup != null) {
            this.channelGroup.close();
        }
        if (this.clientFactory != null) {
            this.clientFactory.releaseExternalResources();
        }
    }

    public ChannelFuture sendMessage(Envelope env) {
        Channel ch = this.channelGroup.iterator().next();
        ChannelFuture cf = ch.write(env);
        return cf;
    }

    private void flood() {
        if ((this.channelGroup == null) || (this.clientFactory == null)) {
            return;
        }

        System.out.println("sending " + this.messages + " messages");
        this.startTime = System.currentTimeMillis();
        for (int i = 0; i < this.messages; i++) {

            this.handler.sendMessage(new Envelope(Version.VERSION1, Type.REQUEST, 1, new byte[1]));
        }
    }
    // main -----------------------------------------------------------------------------------------------------------

    public static void main(String[] args) throws InterruptedException {
        final Client client = new Client("localhost", 9999, 10000);

        if (!client.start()) {
            System.exit(-1);
            return;
        }
        while (client.channelGroup.size() == 0) {
            Thread.sleep(200);
        }
        System.out.println("Client started...");

        client.flood();


        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                System.out.println("shutting down client");
                client.stop();
            }
        });


    }
}

客户处理程序

public class ClientHandler extends SimpleChannelUpstreamHandler {
    // internal vars --------------------------------------------------------------------------------------------------
    private final ClientHandlerListener listener;
    private final ChannelGroup channelGroup;
    private Channel channel;

    // constructors ---------------------------------------------------------------------------------------------------
    public ClientHandler(ClientHandlerListener listener, ChannelGroup channelGroup) {
        this.listener = listener;
        this.channelGroup = channelGroup;
    }

    // SimpleChannelUpstreamHandler -----------------------------------------------------------------------------------

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        if (e.getMessage() instanceof Envelope) {
            Envelope env = (Envelope) e.getMessage();
            this.listener.messageReceived(env);
        } else {
            System.out.println("NOT ENVELOPE!!");
            super.messageReceived(ctx, e);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        System.out.println("**** CAUGHT EXCEPTION CLOSING CHANNEL ***");
        e.getCause().printStackTrace();
        e.getChannel().close();
    }

    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        this.channel = e.getChannel();
        System.out.println("Server connected, channel id: " + this.channel.getId());
        this.channelGroup.add(e.getChannel());
    }

    // public methods -------------------------------------------------------------------------------------------------
    public void sendMessage(Envelope envelope) {
        if (this.channel != null) {
            this.channel.write(envelope);
        }
    }
}

客户端处理程序侦听器接口

public interface ClientHandlerListener {

    void messageReceived(Envelope message);
}
4

1 回答 1

1

在不知道网络上的信封有多大的情况下,我猜测您的问题是您的客户端写入 10,000 条消息而没有检查通道是否可写。

Netty 3.x 处理网络事件并以特定方式写入。有可能您的客户端写入的数据如此之快,以至于 Netty 没有机会处理接收事件。在服务器端,这将导致通道变得不可写并且您的处理程序丢弃回复。

您在 localhost 上看到问题的原因有几个,但可能是因为写入带宽远高于您的网络带宽。客户端不检查通道是否可写,因此通过网络,您的消息由 Netty 缓冲,直到网络可以赶上(如果您写入的消息明显超过 10,000 条,您可能会看到 OutOfMemoryError)。这是一个自然的中断,因为 Netty 将暂停写入,直到网络准备好,允许它处理传入的数据并防止服务器看到不可写的通道。

丢弃处理程序中的DiscardClientHandler显示了如何测试通道是否可写,以及当它再次变为可写时如何恢复。另一种选择是让 sendMessage 返回与写入关联的 ChannelFuture,如果在写入后通道不可写,则阻塞直到未来完成。

此外,您的服务器处理程序应该编写消息,然后检查通道是否可写。如果不是,您应该将通道可读设置为 false。当通道再次变为可写时,Netty 会通知 ChannelInterestChanged。然后您可以将 channel readable 设置为 true 以继续阅读消息。

于 2012-10-11T13:50:03.270 回答