3

我遇到了异步码头响应的问题!我正在使用嵌入式 Jetty 与反应器循环接口,并在循环处理它们时收集响应,然后在答案出现在收集器哈希中时完成 Jetty 请求。

即使收集器哈希中存在某些内容,我也看到关闭上下文的错误-我认为上下文已超时,我试图检测到这一点而不是调用完成,但这不起作用。我尝试检查 ((AsyncContinuation)ctx).isInitial() 以查看上下文是否可以关闭,但它没有做任何事情。

这是错误:

INFO: e0d879983e682c0d6 API Request happens
May 25, 2012 4:39:00 AM server.Reactor$Parser parse
INFO: Received data { data from reactor }
May 25, 2012 4:39:00 AM server.AsyncHTTPRequestProcessor run
INFO: --->API http request in collector:
May 25, 2012 4:39:00 AM server.AsyncHTTPRequestProcessor run
INFO: Collector in async stuff:
2012-05-25 04:39:00.846:WARN:oejut.QueuedThreadPool:
java.lang.IllegalStateException: IDLE,initial
    at org.eclipse.jetty.server.AsyncContinuation.complete(AsyncContinuation.java:569)
    at server.AsyncHTTPRequestProcessor.run(AsyncHTTPRequestProcessor.java:72)
    at org.eclipse.jetty.server.handler.ContextHandler.handle(ContextHandler.java:1119)
    at org.eclipse.jetty.server.AsyncContinuation$1.run(AsyncContinuation.java:875)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:599)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:534)
    at java.lang.Thread.run(Thread.java:679)

这是 Jetty 端点:

package server;
import java.io.*;
import java.nio.channels.Selector;
import java.text.MessageFormat;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.continuation.*;



public class APIRequestHandler extends HttpServlet
{
    private static final long serialVersionUID = -446741424265391138L;

    private ConcurrentHashMap<String, clientconnData> key2clientconn;
    private ConcurrentLinkedQueue<APIMessage> queue;
    private ConcurrentHashMap<String, String> collector;
    private int timeout;
    private Logger logger;
    private Selector selector;

    public APIRequestHandler(Logger logger, Selector selector, ConcurrentLinkedQueue<APIMessage> queue,
            ConcurrentHashMap<String, String> collector, ConcurrentHashMap<String, clientconnData> key2clientconn2, int timeout)
    {
        this.logger = logger;
        this.selector = selector;
        this.queue = queue;
        this.collector = collector;
        this.timeout = timeout;
        this.key2clientconn = key2clientconn2;
    }

    public void doGet(HttpServletRequest req, HttpServletResponse res)
            throws java.io.IOException
            {
        //System.out.println("request URI=" + req.getRequestURI());

        String rid = req.getParameter("rid");
        String clientconnId = req.getParameter("id");
        String msg = req.getParameter("json");

        // We do a null check to avoid the favicon call and other broken calls
        if(clientconnId==null || rid==null || msg==null){
            PrintWriter out = res.getWriter();
            res.setStatus(200);
            res.setContentType("application/json");
            out.println("{");
            out.println("\"response\":{\"success\":\"false\",\"response\":\"request missing parameters\"}");
            out.println("}");
            out.close();
        }
        else if(!this.key2clientconn.containsKey(clientconnId))
        {
            PrintWriter out = res.getWriter();
            res.setStatus(200);
            res.setContentType("application/json");
            out.println("{");
            out.println("\"rid\":"+"\""+rid+"\"");
            out.println(",");
            out.println("\"rtime\":"+0); // milliseconds
            out.println(",");
            out.println("\"response\":{\"success\":\"false\",\"response\":\"clientconn with not found on server\"}");
            out.println("}");
            out.close();
        }
        else// everything is fine, proceed as normal
        {
            logger.log(Level.INFO,"From API: "+msg);

            // Send to channel
            APIMessage m = new APIMessage();
            m.rid = rid;
            m.id = clientconnId;
            m.json = msg + "\r\n";

            // create the async context, otherwise getAsyncContext() will be null
            final AsyncContext ctx = req.startAsync();

            // set the timeout
            ctx.setTimeout(this.timeout);
            ctx.getRequest().setAttribute("rid",rid);
            ctx.getRequest().setAttribute("startTime",System.currentTimeMillis()); // start time

            // set up selector/queue
            queue.add(m);
            this.selector.wakeup();

            // attach listener to respond to lifecycle events of this AsyncContext
            ctx.addListener(new AsyncListener() {
                public void onComplete(AsyncEvent event) throws IOException {
                    log("onComplete called");

                    // NOTE: errors and timeouts are handled with other events.
                    // Successful comm with clientconn handled by AsyncHTTPRequestProcessor.            
                    ServletRequest asyncContextReq = event.getAsyncContext().getRequest();

                    int httpStatusCode = 500;
                    if(asyncContextReq.getAttribute("status") != null)
                        httpStatusCode = (Integer) asyncContextReq.getAttribute("status");
                    Object response = "{}";
                    if(asyncContextReq.getAttribute("response") != null)
                        response = event.getAsyncContext().getRequest().getAttribute("response");

                    long startTime = (Long) asyncContextReq.getAttribute("startTime");
                    long endTime = System.currentTimeMillis();
                    long elapsedTime = endTime - startTime;

                    // Do the response
                    HttpServletResponse ctxRes = (HttpServletResponse)event.getAsyncContext().getResponse();

                    ctxRes.setStatus(httpStatusCode);
                    ctxRes.setContentType("application/json");
                    ctxRes.getWriter().println("{");
                    ctxRes.getWriter().println("\"rid\":"+"\""+asyncContextReq.getParameter("rid")+"\"");
                    ctxRes.getWriter().println(",");
                    ctxRes.getWriter().println("\"rtime\":"+elapsedTime); // milliseconds
                    ctxRes.getWriter().println(",");
                    ctxRes.getWriter().println("\"response\": "+response);

                    ctxRes.getWriter().println("}");
                    ctxRes.getWriter().flush();

                }
                public void onTimeout(AsyncEvent event) throws IOException {
                    log("onTimeout called");

                    // gateway timeout (on clientconn request)
                    ServletRequest asyncContextReq = event.getAsyncContext().getRequest();
                    asyncContextReq.setAttribute("status",500); // request timeout
                    asyncContextReq.setAttribute("response", "{}");
                }
                public void onError(AsyncEvent event) throws IOException {
                    log("onError called");

                    ServletRequest asyncContextReq = event.getAsyncContext().getRequest();
                    asyncContextReq.setAttribute("status",500); // request timeout
                    asyncContextReq.setAttribute("response", "{}");
                }
                public void onStartAsync(AsyncEvent event) throws IOException {
                    log("onStartAsync called");
                }
            });

            // spawn some task in a background thread
            ctx.start(new AsyncHTTPRequestProcessor(ctx,collector,logger));
        }
    }
}

处理异步响应:

package server;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.Date;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;

import javax.servlet.AsyncContext;

import org.eclipse.jetty.server.AsyncContinuation;


public class AsyncHTTPRequestProcessor implements Runnable {

    private ConcurrentHashMap<String, String> collector;
    private Logger logger;
    private AsyncContext ctx;
    private String responseStr = null;

    public AsyncHTTPRequestProcessor(AsyncContext _ctx, 
            ConcurrentHashMap<String, String> _collector, Logger _logger) {
        ctx = _ctx;
        collector = _collector;
        logger = _logger;
    }

    @Override
    public void run() {

        logger.info("AsyncContinuation start");

        //if(!((AsyncContinuation)ctx).isInitial()){
        String rid = (String) ctx.getRequest().getAttribute("rid");
        int elapsed = 0;
        if(rid !=null)
        {

            logger.info("AsyncContinuation rid="+rid);

            while(elapsed<ctx.getTimeout())
            {
                if(collector.containsKey(rid)){
                    responseStr = collector.get(rid);
                    collector.remove(rid);

                    logger.info("--->API http request in collector:"+responseStr);
                    ctx.getRequest().setAttribute("status",200);
                    ctx.getRequest().setAttribute("response", responseStr);
                    ctx.getRequest().setAttribute("endTime",System.currentTimeMillis());
                    break;
                }
                try {
                    Thread.sleep(10);
                    elapsed+=10;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //}
            logger.info("Collector in async stuff:");
            for(String key:collector.keySet()){
                logger.info(key+"->"+collector.get(key));
            }

            for(Entry<String, String> x:collector.entrySet()){
                logger.info(x.getKey()+"->"+x.getValue());
            }
            ctx.complete();
        }
    }

}
4

2 回答 2

0

我也遇到了这个问题。如果 AsyncContext 已关闭,AsyncContext.complate() 方法似乎会抛出 java.lang.IllegalStateException。我认为您应该再次检查您的程序以了解 AsyncContext 是否已关闭在调用 complate() 方法之前。

于 2013-11-07T02:38:03.803 回答
0

我认为您应该在调用 complate() 方法之前再次检查程序以查明 AsyncContext 是否已关闭。这对我的问题有效。

于 2014-04-28T10:44:22.917 回答