这样的事情可以用netty来完成。
我认为代理示例是一个很好的起点:
https://github.com/netty/netty/tree/3/src/main/java/org/jboss/netty/example/proxy
我已经修改了代理示例以实现如上所述的服务器。设计:使用netty-3.5.3
客户端<->|(s-soc)-MUX_NIO_server-(c-socks)|<->server1(lb)
|<->server2
|<->server3
|<->server4
pipeline=>framedecoder-->stringdecoder-->clienthandler-->framedecoder-->stringdecoder-->serverHandler--|
它运行 100% 直到达到 +/- 100 tps,然后来自客户端的相同消息一遍又一遍地发送到服务器,看起来像死锁情况。
在这两个处理程序中,我的 channelInterestChanged 事件如下所示:
//channelInterestChanged 同步 (trafficLock) {
if (e.getChannel().isWritable()) {
inboundChannel.setReadable(true);
if (inboundChannel != null) {
inboundChannel.setReadable(true);
}
}
}
在两个处理程序消息 rx 中,我这样写:
同步(交通锁){
final ChannelFutureListener writeListener =
new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture future)
throws Exception {
if (Consts.DEBUG_ENABLED) {
log.debug("Finished Writing message);
}
}
};
/* if channel is write */
if (inboundChannel.isWritable()) {
inboundChannel.write(bufferMsg).addListener(writeListener);
}
// If inboundChannel is saturated, do not read until notified in
if (!inboundChannel.isWritable()) {
e.getChannel().setReadable(false);
}
}
知道可能出了什么问题以及在哪里解决这个问题吗?谢谢