3

我的应用程序使用嵌入式 Jetty 作为 http 接口,它似乎在正常运行 2-3 周后挂起。经检查,有很多休眠线程,所以我用 jvisualvm 在本地运行它,然后用 pyplot 和一堆简单的 get 敲出来,发现线程的数量总是在增加,其中大部分都在休眠。

我的代码专门处理带有异步侦听器的异步上下文的异步请求,并且基本上间歇性地休眠,直到它找到由主应用程序填充的结果哈希结果。如果结果从未出现,则请求超时。

一位同事建议我可能没有正确关闭请求。是这样吗?我在下面附上了我的代码以供您批评(在 jvisualvm 情节之后)。谢谢!

虚拟机

码头服务器:

public class APIRequestHandler extends HttpServlet
{
    private static final long serialVersionUID = -446741424265391138L;

    private ConcurrentLinkedQueue<APIMessage> queue;
    private ConcurrentHashMap<String, String> collector;
    private int timeout;
    private Logger logger;
    private Selector selector;

    public APIRequestHandler(Logger logger, Selector selector, ConcurrentLinkedQueue<APIMessage> queue,
            ConcurrentHashMap<String, String> collector, int timeout)
    {
        this.logger = logger;
        this.selector = selector;
        this.queue = queue;
        this.collector = collector;
        this.timeout = timeout;
    }

    public void doGet(HttpServletRequest req, HttpServletResponse res)
            throws java.io.IOException
            {

        String rid = req.getParameter("rid");
        String targetId = req.getParameter("id");
        String msg = req.getParameter("json");

        logger.info("requestId: "+rid);
        logger.info("targetId: "+targetId);
        logger.info("json: "+msg);

        // Send to channel
        APIMessage m = new APIMessage();
        m.rid = rid;
        m.id = targetId;
        m.json = msg + "\r\n";

        // create the async context, otherwise getAsyncContext() will be null
        final AsyncContext ctx = req.startAsync();

        // set the timeout
        ctx.setTimeout(this.timeout);
        ctx.getRequest().setAttribute("rid",rid);
        ctx.getRequest().setAttribute("startTime",System.currentTimeMillis()); // start time

        // set up selector/queue
        queue.add(m);
        this.selector.wakeup();

        // attach listener to respond to lifecycle events of this AsyncContext
        ctx.addListener(new AsyncListener() {
            public void onComplete(AsyncEvent event) throws IOException {
                log("onComplete called");

                // NOTE: errors and timeouts are handled with other events.
                // Successful comm with plug handled by AsyncHTTPRequestProcessor.

                ServletRequest asyncContextReq = event.getAsyncContext().getRequest();

                // Do the response
                HttpServletResponse ctxRes = (HttpServletResponse)event.getAsyncContext().getResponse();

                int httpStatusCode = (Integer) asyncContextReq.getAttribute("status");
                long startTime = (Long) asyncContextReq.getAttribute("startTime");
                long endTime = (Long) asyncContextReq.getAttribute("endTime");
                long elapsedTime = endTime - startTime;

                ctxRes.setStatus(httpStatusCode);
                ctxRes.setContentType("application/json");

                ctxRes.getWriter().println("{");
                ctxRes.getWriter().println("\"rid\":"+"\""+ctx.getRequest().getParameter("rid")+"\"");
                ctxRes.getWriter().println(",");
                ctxRes.getWriter().println("\"rtime\":"+elapsedTime); // milliseconds
                ctxRes.getWriter().println(",");
                ctxRes.getWriter().println("\"startTime\":"+"\""+startTime/1000L+"\""); // unix time
                ctxRes.getWriter().println(",");
                ctxRes.getWriter().println("\"response\": "+event.getAsyncContext().getRequest().getAttribute("response"));

                ctxRes.getWriter().println("}");
                ctxRes.getWriter().flush();

            }
            public void onTimeout(AsyncEvent event) throws IOException {
                log("onTimeout called");

                // gateway timeout (on plug request)
                ServletRequest asyncContextReq = event.getAsyncContext().getRequest();
                asyncContextReq.setAttribute("endTime",System.currentTimeMillis());
                asyncContextReq.setAttribute("status",504); // request timeout
                asyncContextReq.setAttribute("response", "{}");
            }
            public void onError(AsyncEvent event) throws IOException {
                log("onError called");

                ServletRequest asyncContextReq = event.getAsyncContext().getRequest();
                asyncContextReq.setAttribute("endTime",System.currentTimeMillis());
                asyncContextReq.setAttribute("status",500); // request timeout
                asyncContextReq.setAttribute("response", "{}");
            }
            public void onStartAsync(AsyncEvent event) throws IOException {
                log("onStartAsync called");
            }
        });

        // spawn some task in a background thread
        ctx.start(new AsyncHTTPRequestProcessor(ctx,collector,logger));
    }
}

异步 HTTP 请求处理器:

public class AsyncHTTPRequestProcessor implements Runnable {

    private ConcurrentHashMap<String, String> collector;
    private Logger logger;
    private AsyncContext ctx;
        //Defined this here because of strange behaviour when running junit
        //tests and the response json string being empty...
    private String responseStr = null;

    public AsyncHTTPRequestProcessor(AsyncContext _ctx, 
            ConcurrentHashMap<String, String> _collector, Logger _logger) {
        ctx = _ctx;
        collector = _collector;
        logger = _logger;
    }

    @Override
    public void run() {

        String rid = (String) ctx.getRequest().getAttribute("rid");
        int elapsed = 0;

        while(elapsed<1000)
        {
            if(collector.containsKey(rid)){
                responseStr = collector.get(rid);
                collector.remove(rid);

                logger.info("--->API http request found response in collector!");

                ctx.getRequest().setAttribute("status",200);
                ctx.getRequest().setAttribute("response", responseStr);
                ctx.getRequest().setAttribute("endTime",System.currentTimeMillis());
                break;
            }
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        ctx.complete();
    }

}
4

0 回答 0