我有一个相当简单的测试 Netty 服务器/客户端项目。我正在测试通信稳定性的某些方面,方法是用消息淹没服务器并计算我返回的消息和字节以确保一切匹配。
当我从客户端运行洪水时,客户端会跟踪它发送的消息数量以及返回的消息数量,然后当数量相等时,它会打印出一些统计信息。
在本地运行时的某些情况下(我猜是因为拥塞?)客户端永远不会最终打印出最终消息。当 2 个组件在远程机器上时,我没有遇到这个问题。任何建议,将不胜感激:
Encoder 只是一个简单的 OneToOneEncoder ,它将 Envelope 类型编码为 ChannelBuffer ,而 Decoder 是一个简单的 ReplayDecoder ,它做相反的事情。
我尝试向我的客户端处理程序添加一个 ChannelInterestChanged 方法,以查看通道的兴趣是否已更改为不读取,但似乎并非如此。
相关代码如下:
谢谢!
服务器
public class Server {
// configuration --------------------------------------------------------------------------------------------------
private final int port;
private ServerChannelFactory serverFactory;
// constructors ---------------------------------------------------------------------------------------------------
public Server(int port) {
this.port = port;
}
// public methods -------------------------------------------------------------------------------------------------
public boolean start() {
ExecutorService bossThreadPool = Executors.newCachedThreadPool();
ExecutorService childThreadPool = Executors.newCachedThreadPool();
this.serverFactory = new NioServerSocketChannelFactory(bossThreadPool, childThreadPool);
this.channelGroup = new DeviceIdAwareChannelGroup(this + "-channelGroup");
ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("encoder", Encoder.getInstance());
pipeline.addLast("decoder", new Decoder());
pipeline.addLast("handler", new ServerHandler());
return pipeline;
}
};
ServerBootstrap bootstrap = new ServerBootstrap(this.serverFactory);
bootstrap.setOption("reuseAddress", true);
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
bootstrap.setPipelineFactory(pipelineFactory);
Channel channel = bootstrap.bind(new InetSocketAddress(this.port));
if (!channel.isBound()) {
this.stop();
return false;
}
this.channelGroup.add(channel);
return true;
}
public void stop() {
if (this.channelGroup != null) {
ChannelGroupFuture channelGroupCloseFuture = this.channelGroup.close();
System.out.println("waiting for ChannelGroup shutdown...");
channelGroupCloseFuture.awaitUninterruptibly();
}
if (this.serverFactory != null) {
this.serverFactory.releaseExternalResources();
}
}
// main -----------------------------------------------------------------------------------------------------------
public static void main(String[] args) {
int port;
if (args.length != 3) {
System.out.println("No arguments found using default values");
port = 9999;
} else {
port = Integer.parseInt(args[1]);
}
final Server server = new Server( port);
if (!server.start()) {
System.exit(-1);
}
System.out.println("Server started on port 9999 ... ");
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
server.stop();
}
});
}
}
服务器处理程序
public class ServerHandler extends SimpleChannelUpstreamHandler {
// internal vars --------------------------------------------------------------------------------------------------
private AtomicInteger numMessagesReceived=new AtomicInteger(0);
// constructors ---------------------------------------------------------------------------------------------------
public ServerHandler() {
}
// SimpleChannelUpstreamHandler -----------------------------------------------------------------------------------
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
Channel c = e.getChannel();
System.out.println("ChannelConnected: channel id: " + c.getId() + ", remote host: " + c.getRemoteAddress() + ", isChannelConnected(): " + c.isConnected());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
System.out.println("*** EXCEPTION CAUGHT!!! ***");
e.getChannel().close();
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
super.channelDisconnected(ctx, e);
System.out.println("*** CHANNEL DISCONNECTED ***");
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
if(numMessagesReceived.incrementAndGet()%1000==0 ){
System.out.println("["+numMessagesReceived+"-TH MSG]: Received message: " + e.getMessage());
}
if (e.getMessage() instanceof Envelope) {
// echo it...
if (e.getChannel().isWritable()) {
e.getChannel().write(e.getMessage());
}
} else {
super.messageReceived(ctx, e);
}
}
}
客户
public class Client implements ClientHandlerListener {
// configuration --------------------------------------------------------------------------------------------------
private final String host;
private final int port;
private final int messages;
// internal vars --------------------------------------------------------------------------------------------------
private ChannelFactory clientFactory;
private ChannelGroup channelGroup;
private ClientHandler handler;
private final AtomicInteger received;
private long startTime;
private ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// constructors ---------------------------------------------------------------------------------------------------
public Client(String host, int port, int messages) {
this.host = host;
this.port = port;
this.messages = messages;
this.received = new AtomicInteger(0);
}
// ClientHandlerListener ------------------------------------------------------------------------------------------
@Override
public void messageReceived(Envelope message) {
if (this.received.incrementAndGet() == this.messages) {
long stopTime = System.currentTimeMillis();
float timeInSeconds = (stopTime - this.startTime) / 1000f;
System.err.println("Sent and received " + this.messages + " in " + timeInSeconds + "s");
System.err.println("That's " + (this.messages / timeInSeconds) + " echoes per second!");
}
}
// public methods -------------------------------------------------------------------------------------------------
public boolean start() {
// For production scenarios, use limited sized thread pools
this.clientFactory = new NioClientSocketChannelFactory(cachedThreadPool, cachedThreadPool);
this.channelGroup = new DefaultChannelGroup(this + "-channelGroup");
this.handler = new ClientHandler(this, this.channelGroup);
ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("byteCounter", new ByteCounter("clientByteCounter"));
pipeline.addLast("encoder", Encoder.getInstance());
pipeline.addLast("decoder", new Decoder());
pipeline.addLast("handler", handler);
return pipeline;
}
};
ClientBootstrap bootstrap = new ClientBootstrap(this.clientFactory);
bootstrap.setOption("reuseAddress", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);
bootstrap.setPipelineFactory(pipelineFactory);
boolean connected = bootstrap.connect(new InetSocketAddress(host, port)).awaitUninterruptibly().isSuccess();
System.out.println("isConnected: " + connected);
if (!connected) {
this.stop();
}
return connected;
}
public void stop() {
if (this.channelGroup != null) {
this.channelGroup.close();
}
if (this.clientFactory != null) {
this.clientFactory.releaseExternalResources();
}
}
public ChannelFuture sendMessage(Envelope env) {
Channel ch = this.channelGroup.iterator().next();
ChannelFuture cf = ch.write(env);
return cf;
}
private void flood() {
if ((this.channelGroup == null) || (this.clientFactory == null)) {
return;
}
System.out.println("sending " + this.messages + " messages");
this.startTime = System.currentTimeMillis();
for (int i = 0; i < this.messages; i++) {
this.handler.sendMessage(new Envelope(Version.VERSION1, Type.REQUEST, 1, new byte[1]));
}
}
// main -----------------------------------------------------------------------------------------------------------
public static void main(String[] args) throws InterruptedException {
final Client client = new Client("localhost", 9999, 10000);
if (!client.start()) {
System.exit(-1);
return;
}
while (client.channelGroup.size() == 0) {
Thread.sleep(200);
}
System.out.println("Client started...");
client.flood();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.out.println("shutting down client");
client.stop();
}
});
}
}
客户处理程序
public class ClientHandler extends SimpleChannelUpstreamHandler {
// internal vars --------------------------------------------------------------------------------------------------
private final ClientHandlerListener listener;
private final ChannelGroup channelGroup;
private Channel channel;
// constructors ---------------------------------------------------------------------------------------------------
public ClientHandler(ClientHandlerListener listener, ChannelGroup channelGroup) {
this.listener = listener;
this.channelGroup = channelGroup;
}
// SimpleChannelUpstreamHandler -----------------------------------------------------------------------------------
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
if (e.getMessage() instanceof Envelope) {
Envelope env = (Envelope) e.getMessage();
this.listener.messageReceived(env);
} else {
System.out.println("NOT ENVELOPE!!");
super.messageReceived(ctx, e);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
System.out.println("**** CAUGHT EXCEPTION CLOSING CHANNEL ***");
e.getCause().printStackTrace();
e.getChannel().close();
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
this.channel = e.getChannel();
System.out.println("Server connected, channel id: " + this.channel.getId());
this.channelGroup.add(e.getChannel());
}
// public methods -------------------------------------------------------------------------------------------------
public void sendMessage(Envelope envelope) {
if (this.channel != null) {
this.channel.write(envelope);
}
}
}
客户端处理程序侦听器接口
public interface ClientHandlerListener {
void messageReceived(Envelope message);
}