4

我正在尝试创建一个 TCP 代理,该代理使用 Netty/Java 将请求转发到许多其他 TCP 端点。

例如:

                     /--> SERVER A 
Client A --> PROXY --
                     \--> SERVER B 

如果Client A通过代理发送 TCP 命令,则代理会打开两个到Server A和的 TCP 连接Server B,同时代理发送Client A给它们的请求。

如果Client A随后发送另一个命令,则代理理论上已经将这两个连接缓存在一个池中,因此无需再次打开两个新连接,将请求代理到两个服务器。

关于响应处理,我想有两个选择:

  • 一个接一个地显示两个响应Client A
  • 或者完全忽略响应。

如果连接丢失或关闭,代理应该能够自动重新创建它并将其添加回连接池。

我一直在看Netty的例子,并试图用它ChannelGroup来处理连接池,但没有成功。此外,在我下面的代码中,在发送第一个请求后,代理停止工作。有小费吗?

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.LinkedList;
import java.util.List;

public class TcpProxyHandler extends ChannelInboundHandlerAdapter {

    private static List<String> hosts = new LinkedList<>();
    private static List<String> connected = new LinkedList<>();

    static {
        hosts.add("127.0.0.1:10000");
        hosts.add("127.0.0.1:20000");
    }

    static final ChannelGroup channels = new DefaultChannelGroup(
            GlobalEventExecutor.INSTANCE);

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final Channel inboundChannel = ctx.channel();

        for (String host : hosts) {
            if (!connected.contains(host)) {
                String address = host.split(":")[0];
                int port = Integer.parseInt(host.split(":")[1]);
                Channel outboundChannel = ConnectionPool.getConnection(address,
                        port);
                if (outboundChannel == null) {
                    Bootstrap b = new Bootstrap();
                    b.group(inboundChannel.eventLoop())
                            .channel(ctx.channel().getClass())
                            .handler(new TcpProxyBackendHandler(inboundChannel))
                            .option(ChannelOption.AUTO_READ, false);
                    ChannelFuture f = b.connect(address, port);
                    outboundChannel = f.channel();
                    f.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future)
                                throws Exception {
                            if (future.isSuccess()) {
                                // connection complete start to read first data
                                inboundChannel.read();
                            } else {
                                // Close the connection if the connection
                                // attempt
                                // has failed.
                                inboundChannel.close();
                            }
                        }
                    });

                    channels.add(outboundChannel);
                    connected.add(host);
                    System.out.println("Connected to " + host);
                }
            }

        }

    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg)
            throws Exception {
        channels.flushAndWrite(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    static void closeOnFlush(Channel ch) {
        if (ch.isActive()) {
            ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(
                    ChannelFutureListener.CLOSE);
        }
    }

    static class TcpProxyBackendHandler extends ChannelInboundHandlerAdapter {

        private final Channel inboundChannel;

        public TcpProxyBackendHandler(Channel inboundChannel) {
            this.inboundChannel = inboundChannel;
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.read();
            ctx.write(Unpooled.EMPTY_BUFFER);
        }

        @Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg)
                throws Exception {
            inboundChannel.writeAndFlush(msg).addListener(
                    new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future)
                                throws Exception {
                            if (future.isSuccess()) {
                                ctx.channel().read();
                            } else {
                                future.channel().close();
                            }
                        }
                    });
        }

        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            TcpProxyHandler.closeOnFlush(inboundChannel);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            TcpProxyHandler.closeOnFlush(ctx.channel());
        }

    }

}
4

2 回答 2

0

如果您还没有尝试过,请再次启用 AUTO_READ 并删除对 read() 的手动调用。您也可以将自动读取设置为 false 来初始化您的服务器,您也可以尝试更改它。

于 2015-04-19T20:41:01.977 回答
0

您可以尝试在另一个线程中调用 connect() 和 read(),让 ChannelGrop 的工作人员有机会完成其工作。

于 2013-10-08T09:47:39.840 回答