感谢伟大的图书馆网络,我正在使用最新的 3.5.8 版本。我的应用程序设计如下
- 我有一个 Web 应用程序,它接收来自最终用户的请求(MyWebApp)
- 这个web应用修改接收到的数据,构造一个MyBean对象,连接到MyServer(建立在netty上)进行处理
- MyServer 执行一些业务逻辑并返回一个修改后的 MyBean 对象
- MyWebApp 通过 JSP 接收并移交给最终用户
MyServer 工作正常并且能够处理请求。
我观察到客户端在有并发请求时会延迟请求。尽管在几毫秒内处理了大量请求,但在某些情况下,很少有请求被延迟。延迟请求有时需要 1-4 分钟。但我观察到服务器很晚才收到这些请求,但会立即处理。所以,我只能怀疑客户端的实现。我试图实现“通道”的连接池,但请求和响应不匹配。由于这两种实现都是由我完成的,所以我可以自由地更改双方的代码。因此,我修改了服务器 messageReceived 源代码,即使在它处理了响应之后也不关闭连接。现在是客户端打开通道发出请求并在收到响应后关闭通道。
理想情况下,我想使用一个通道或通道池或任何其他实现并处理并发请求。我觉得 Netty 很容易用于服务器实现,但客户端实现不适合我这种设计。如果这可以成为 netty 库本身的一部分,那就太好了
任何想法,将不胜感激。
我的代码在客户端如下
public class NioConnectorImpl implements ICoreConnector{
private String host;
private int port;
private static ChannelFactory channelFactory;
public NioConnectorImpl(String host, int port){
this.host = host;
this.port = port;
}
@Override
public MyBean processRequest(MyBean bean) {
return (MyBean) callMyServer(bean);
}
public Object callMyServer(Object requestObject) throws RuntimeException{
if(channelFactory==null){
System.out.println("createFactory with 3 Boss Threads and "+Runtime.getRuntime().availableProcessors() * 2+ " Child Threads");
channelFactory = new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool(),3, Runtime.getRuntime().availableProcessors() * 2);
}
ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new MyChannelPipelineFactory(requestObject));
//Connection timeOut for initial connection...
bootstrap.setOption("connectTimeoutMillis", 10000);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("keepAlive", true);
ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection is made successfully.
Channel channel = connectFuture.awaitUninterruptibly().getChannel();
// Get the handler instance to retrieve the answer.
MyChannelHandler handler = (MyChannelHandler) channel.getPipeline().getLast();
Object object = handler.getObjectBean();
channel.close().awaitUninterruptibly();
//bootstrap.releaseExternalResources();
//channelFactory.releaseExternalResources();
return object;
}
@Override
public void safeReleaseResources() {
System.out.println("Releasing External Resources::::::::::::");
if(channelFactory!=null)
channelFactory.releaseExternalResources();
try {
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
MyChannelHandler 代码
public class MyChannelHandler extends SimpleChannelUpstreamHandler{
private static final Logger logger = Logger.getLogger(MyChannelHandler.class.getName());
private final Object object;
final BlockingQueue<Object> answer = new LinkedBlockingQueue<Object>();
public MyChannelHandler(Object object) {
this.object= object;
}
public Object getObjectBean() {
boolean interrupted = false;
for (;;) {
try {
Object object = answer.take();
if (interrupted) {
Thread.currentThread().interrupt();
}
return object;
} catch (InterruptedException e) {
interrupted = true;
}
}
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
sendObject(e);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
if (e.getMessage() != null) {
// Offer the answer after closing the connection.
e.getChannel().close().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
boolean offered = answer.offer((Object) e.getMessage());
assert offered;
}
});
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
logger.log(Level.WARNING,"Naga: Unexpected exception from downstream.",e.getCause());
e.getChannel().close();
}
private void sendObject(ChannelStateEvent e) {
Channel channel = e.getChannel();
if(channel.isWritable()) channel.write(object);
}
}
我的服务器代码
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
Object object = MyServerController().handleRequest(e.getMessage(), e.getRemoteAddress().toString());
final ChannelFuture future = e.getChannel().write(object);
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) {
Channel ch = future.getChannel();
//ch.close(); // expect client closes the connection
}
});
}