I am creating a proxy with Netty framework but I am noticing that the last messages that is received tends to delay before passing to the next node.
Design:
Client|<---------->| Proxy |<------------>| Server
Basically, the issue comes when the server initiates a message their is a delay before passing it to the client or if the server sends a subsequent message right after the first message then first message goes through and the second message delays for some seconds. Why is this the case? Is there some configuration parameter that I am missing?
Startup.java
Executor executor = Executors.newCachedThreadPool();
ServerBootstrap sb = new ServerBootstrap(
new NioServerSocketChannelFactory(executor, executor));
// Set up the event pipeline factory.
ClientSocketChannelFactory cf =
new NioClientSocketChannelFactory(executor, executor);
sb.setPipelineFactory(
new ProxyPipelineFactory(cf, remoteHost, remotePort));
sb.setOption("child.tcpNoDelay", true);
sb.setOption("child.keepAlive", true);
// Start up the server.
sb.bind(new InetSocketAddress(localPort));
ProxyPipelineFactory.java
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline p = pipeline();
p.addLast("handler", new ClientHandler(cf, remoteHost, remotePort));
return p;
}
ClientHandler.java
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
// Suspend incoming traffic until connected to the remote host.
final Channel inboundChannel = e.getChannel();
inboundChannel.setReadable(false);
// Start the connection attempt.
ClientBootstrap cb = new ClientBootstrap(cf);
cb.setOption("child.tcpNoDelay", true);
cb.setOption("child.keepAlive", true);
ChannelPipeline p = cb.getPipeline();
p.addLast("famer", new DelimiterBasedFrameDecoder(8192, false, new ChannelBuffer[]{ChannelBuffers.wrappedBuffer("</cmd>".getBytes())}));
p.addLast("handler", new ServerHandler(e.getChannel(), trafficLock));
ChannelFuture f = cb.connect(new InetSocketAddress(remoteHost, remotePort));
outboundChannel = f.getChannel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// Connection attempt succeeded:
// Begin to accept incoming traffic.
inboundChannel.setReadable(true);
} else {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
}
}
});
}
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
throws Exception {
BigEndianHeapChannelBuffer msg = (BigEndianHeapChannelBuffer) e.getMessage();
if (log.isDebugEnabled()) {
byte[] bytes = new byte[msg.capacity()];
msg.readBytes(bytes);
msg.setIndex(0, bytes.length);
StringBuilder out = new StringBuilder("\nPROXY[ ").append(e.getChannel().getRemoteAddress()).append(" ---> Server ]");
out.append("\nMESSAGE length=").append(bytes.length).append("\n").append(new String(bytes));
log.debug(out.toString());
}
synchronized (trafficLock) {
outboundChannel.write(msg);
// If outboundChannel is saturated, do not read until notified in
// OutboundHandler.channelInterestChanged().
if (!outboundChannel.isWritable()) {
e.getChannel().setReadable(false);
}
}
}
ServerHandler.java
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
throws Exception {
BigEndianHeapChannelBuffer msg = (BigEndianHeapChannelBuffer) e.getMessage();
proxy(e.getChannel(), msg);
}
private void proxy(Channel connection, ChannelBuffer raw) {
synchronized (trafficLock) {
inboundChannel.write(raw);
// If inboundChannel is saturated, do not read until notified in
// ClientHandler.channelInterestChanged().
if (!inboundChannel.isWritable()) {
connection.setReadable(false);
}
}
}