4

感谢伟大的图书馆网络,我正在使用最新的 3.5.8 版本。我的应用程序设计如下

  1. 我有一个 Web 应用程序,它接收来自最终用户的请求(MyWebApp)
  2. 这个web应用修改接收到的数据,构造一个MyBean对象,连接到MyServer(建立在netty上)进行处理
  3. MyServer 执行一些业务逻辑并返回一个修改后的 MyBean 对象
  4. MyWebApp 通过 JSP 接收并移交给最终用户

MyServer 工作正常并且能够处理请求。

我观察到客户端在有并发请求时会延迟请求。尽管在几毫秒内处理了大量请求,但在某些情况下,很少有请求被延迟。延迟请求有时需要 1-4 分钟。但我观察到服务器很晚才收到这些请求,但会立即处理。所以,我只能怀疑客户端的实现。我试图实现“通道”的连接池,但请求和响应不匹配。由于这两种实现都是由我完成的,所以我可以自由地更改双方的代码。因此,我修改了服务器 messageReceived 源代码,即使在它处理了响应之后也不关闭连接。现在是客户端打开通道发出请求并在收到响应后关闭通道。

理想情况下,我想使用一个通道或通道池或任何其他实现并处理并发请求。我觉得 Netty 很容易用于服务器实现,但客户端实现不适合我这种设计。如果这可以成为 netty 库本身的一部分,那就太好了

任何想法,将不胜感激。

我的代码在客户端如下

    public class NioConnectorImpl implements ICoreConnector{

private String host;
private int port;
private static ChannelFactory channelFactory;

public NioConnectorImpl(String host, int port){
    this.host = host;
    this.port = port;
}


@Override
public MyBean processRequest(MyBean bean) {
    return (MyBean) callMyServer(bean);
}


public Object callMyServer(Object requestObject) throws RuntimeException{


    if(channelFactory==null){
        System.out.println("createFactory with 3 Boss Threads and "+Runtime.getRuntime().availableProcessors() * 2+ " Child Threads");
        channelFactory  =  new NioClientSocketChannelFactory(
                Executors.newCachedThreadPool(),
                Executors.newCachedThreadPool(),3, Runtime.getRuntime().availableProcessors() * 2);
    }

ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);

    // Set up the event pipeline factory.
    bootstrap.setPipelineFactory(new MyChannelPipelineFactory(requestObject));
    //Connection timeOut for initial connection...
    bootstrap.setOption("connectTimeoutMillis", 10000);
    bootstrap.setOption("tcpNoDelay", true);
    bootstrap.setOption("keepAlive", true);

    ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress(host, port));

// Wait until the connection is made successfully.
    Channel channel = connectFuture.awaitUninterruptibly().getChannel();

    // Get the handler instance to retrieve the answer.
    MyChannelHandler handler = (MyChannelHandler) channel.getPipeline().getLast();

    Object  object = handler.getObjectBean();

    channel.close().awaitUninterruptibly();

    //bootstrap.releaseExternalResources();
    //channelFactory.releaseExternalResources();

return object;

}

@Override
public void safeReleaseResources() {
    System.out.println("Releasing External Resources::::::::::::");
    if(channelFactory!=null)
    channelFactory.releaseExternalResources();

    try {
        Thread.sleep(5000L);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}

}

MyChannelHandler 代码

    public class MyChannelHandler extends SimpleChannelUpstreamHandler{

    private static final Logger logger = Logger.getLogger(MyChannelHandler.class.getName());
    private final Object object;
    final BlockingQueue<Object> answer = new LinkedBlockingQueue<Object>();

    public MyChannelHandler(Object object) {
        this.object= object;
    }

    public Object getObjectBean() {
        boolean interrupted = false;
        for (;;) {
            try {
                Object object = answer.take();
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
                return object;
            } catch (InterruptedException e) {
                interrupted = true;
            }
        }
    }


    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
        sendObject(e);
    }



    @Override
    public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
        if (e.getMessage() != null) {
            // Offer the answer after closing the connection.
            e.getChannel().close().addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) {
                    boolean offered = answer.offer((Object) e.getMessage());
                    assert offered;
                }
            });
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        logger.log(Level.WARNING,"Naga: Unexpected exception from downstream.",e.getCause());
        e.getChannel().close();
    }

    private void sendObject(ChannelStateEvent e) {
        Channel channel = e.getChannel();
       if(channel.isWritable()) channel.write(object);
    }
}

我的服务器代码

     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {


    Object object = MyServerController().handleRequest(e.getMessage(), e.getRemoteAddress().toString());

    final ChannelFuture future =  e.getChannel().write(object);
         future.addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture f) {
                    Channel ch = future.getChannel();
                    //ch.close(); // expect client closes the connection
                }
         });
}
4

0 回答 0