0

I am creating a proxy with Netty framework but I am noticing that the last messages that is received tends to delay before passing to the next node.

Design:

Client|<---------->| Proxy |<------------>| Server

Basically, the issue comes when the server initiates a message their is a delay before passing it to the client or if the server sends a subsequent message right after the first message then first message goes through and the second message delays for some seconds. Why is this the case? Is there some configuration parameter that I am missing?

Startup.java

        Executor executor = Executors.newCachedThreadPool();
        ServerBootstrap sb = new ServerBootstrap(
                new NioServerSocketChannelFactory(executor, executor));

        // Set up the event pipeline factory.
        ClientSocketChannelFactory cf =
                new NioClientSocketChannelFactory(executor, executor);

        sb.setPipelineFactory(
                new ProxyPipelineFactory(cf, remoteHost, remotePort));

        sb.setOption("child.tcpNoDelay", true);
        sb.setOption("child.keepAlive", true);

        // Start up the server.
        sb.bind(new InetSocketAddress(localPort));

ProxyPipelineFactory.java

@Override
public ChannelPipeline getPipeline() throws Exception {
    ChannelPipeline p = pipeline(); 
    p.addLast("handler", new ClientHandler(cf, remoteHost, remotePort));
    return p;
}

ClientHandler.java

@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
        throws Exception {
    // Suspend incoming traffic until connected to the remote host.
    final Channel inboundChannel = e.getChannel();
    inboundChannel.setReadable(false);

    // Start the connection attempt.
    ClientBootstrap cb = new ClientBootstrap(cf);

        cb.setOption("child.tcpNoDelay", true);
        cb.setOption("child.keepAlive", true);
    ChannelPipeline p = cb.getPipeline();
    p.addLast("famer", new DelimiterBasedFrameDecoder(8192, false, new ChannelBuffer[]{ChannelBuffers.wrappedBuffer("</cmd>".getBytes())}));
p.addLast("handler", new ServerHandler(e.getChannel(), trafficLock));
    ChannelFuture f = cb.connect(new InetSocketAddress(remoteHost, remotePort));

    outboundChannel = f.getChannel();
    f.addListener(new ChannelFutureListener() {

        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                // Connection attempt succeeded:
                // Begin to accept incoming traffic.
                inboundChannel.setReadable(true);
            } else {
                // Close the connection if the connection attempt has failed.
                inboundChannel.close();
            }
        }
    });
}

@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
        throws Exception {

    BigEndianHeapChannelBuffer msg = (BigEndianHeapChannelBuffer) e.getMessage();

    if (log.isDebugEnabled()) {
        byte[] bytes = new byte[msg.capacity()];
        msg.readBytes(bytes);
        msg.setIndex(0, bytes.length);
        StringBuilder out = new StringBuilder("\nPROXY[ ").append(e.getChannel().getRemoteAddress()).append(" ---> Server ]");
        out.append("\nMESSAGE length=").append(bytes.length).append("\n").append(new String(bytes));
        log.debug(out.toString());
    }

    synchronized (trafficLock) {
        outboundChannel.write(msg);
        // If outboundChannel is saturated, do not read until notified in
        // OutboundHandler.channelInterestChanged().
        if (!outboundChannel.isWritable()) {
            e.getChannel().setReadable(false);
        }
    }
}

ServerHandler.java

@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
        throws Exception {
    BigEndianHeapChannelBuffer msg = (BigEndianHeapChannelBuffer) e.getMessage();
    proxy(e.getChannel(), msg);
}
private void proxy(Channel connection, ChannelBuffer raw) {
    synchronized (trafficLock) {
        inboundChannel.write(raw);
        // If inboundChannel is saturated, do not read until notified in
        // ClientHandler.channelInterestChanged().
        if (!inboundChannel.isWritable()) {
            connection.setReadable(false);
        }
    }
}
4

1 回答 1

0

您禁用了连接的可读性,您在哪里再次打开它以读取更多字节?您可以在 netty.io 上阅读Prxoy Server示例,它的功能完全相同。

private void proxy(Channel connection, ChannelBuffer raw) {
    synchronized (trafficLock) {
        inboundChannel.write(raw);
        // If inboundChannel is saturated, do not read until notified in
        // ClientHandler.channelInterestChanged().
        if (!inboundChannel.isWritable()) {
            connection.setReadable(false);
        }
    }
}
于 2012-07-27T08:58:38.850 回答