我正在尝试创建一个 TCP 代理,该代理使用 Netty/Java 将请求转发到许多其他 TCP 端点。
例如:
/--> SERVER A
Client A --> PROXY --
\--> SERVER B
如果Client A
通过代理发送 TCP 命令,则代理会打开两个到Server A
和的 TCP 连接Server B
,同时代理发送Client A
给它们的请求。
如果Client A
随后发送另一个命令,则代理理论上已经将这两个连接缓存在一个池中,因此无需再次打开两个新连接,将请求代理到两个服务器。
关于响应处理,我想有两个选择:
- 一个接一个地显示两个响应
Client A
。 - 或者完全忽略响应。
如果连接丢失或关闭,代理应该能够自动重新创建它并将其添加回连接池。
我一直在看Netty的例子,并试图用它ChannelGroup
来处理连接池,但没有成功。此外,在我下面的代码中,在发送第一个请求后,代理停止工作。有小费吗?
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOption;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.LinkedList;
import java.util.List;
public class TcpProxyHandler extends ChannelInboundHandlerAdapter {
private static List<String> hosts = new LinkedList<>();
private static List<String> connected = new LinkedList<>();
static {
hosts.add("127.0.0.1:10000");
hosts.add("127.0.0.1:20000");
}
static final ChannelGroup channels = new DefaultChannelGroup(
GlobalEventExecutor.INSTANCE);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final Channel inboundChannel = ctx.channel();
for (String host : hosts) {
if (!connected.contains(host)) {
String address = host.split(":")[0];
int port = Integer.parseInt(host.split(":")[1]);
Channel outboundChannel = ConnectionPool.getConnection(address,
port);
if (outboundChannel == null) {
Bootstrap b = new Bootstrap();
b.group(inboundChannel.eventLoop())
.channel(ctx.channel().getClass())
.handler(new TcpProxyBackendHandler(inboundChannel))
.option(ChannelOption.AUTO_READ, false);
ChannelFuture f = b.connect(address, port);
outboundChannel = f.channel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
if (future.isSuccess()) {
// connection complete start to read first data
inboundChannel.read();
} else {
// Close the connection if the connection
// attempt
// has failed.
inboundChannel.close();
}
}
});
channels.add(outboundChannel);
connected.add(host);
System.out.println("Connected to " + host);
}
}
}
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg)
throws Exception {
channels.flushAndWrite(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
ctx.close();
}
static void closeOnFlush(Channel ch) {
if (ch.isActive()) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(
ChannelFutureListener.CLOSE);
}
}
static class TcpProxyBackendHandler extends ChannelInboundHandlerAdapter {
private final Channel inboundChannel;
public TcpProxyBackendHandler(Channel inboundChannel) {
this.inboundChannel = inboundChannel;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.read();
ctx.write(Unpooled.EMPTY_BUFFER);
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg)
throws Exception {
inboundChannel.writeAndFlush(msg).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
if (future.isSuccess()) {
ctx.channel().read();
} else {
future.channel().close();
}
}
});
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
TcpProxyHandler.closeOnFlush(inboundChannel);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
TcpProxyHandler.closeOnFlush(ctx.channel());
}
}
}