我遇到了异步码头响应的问题!我正在使用嵌入式 Jetty 与反应器循环接口,并在循环处理它们时收集响应,然后在答案出现在收集器哈希中时完成 Jetty 请求。
即使收集器哈希中存在某些内容,我也看到关闭上下文的错误-我认为上下文已超时,我试图检测到这一点而不是调用完成,但这不起作用。我尝试检查 ((AsyncContinuation)ctx).isInitial() 以查看上下文是否可以关闭,但它没有做任何事情。
这是错误:
INFO: e0d879983e682c0d6 API Request happens
May 25, 2012 4:39:00 AM server.Reactor$Parser parse
INFO: Received data { data from reactor }
May 25, 2012 4:39:00 AM server.AsyncHTTPRequestProcessor run
INFO: --->API http request in collector:
May 25, 2012 4:39:00 AM server.AsyncHTTPRequestProcessor run
INFO: Collector in async stuff:
2012-05-25 04:39:00.846:WARN:oejut.QueuedThreadPool:
java.lang.IllegalStateException: IDLE,initial
at org.eclipse.jetty.server.AsyncContinuation.complete(AsyncContinuation.java:569)
at server.AsyncHTTPRequestProcessor.run(AsyncHTTPRequestProcessor.java:72)
at org.eclipse.jetty.server.handler.ContextHandler.handle(ContextHandler.java:1119)
at org.eclipse.jetty.server.AsyncContinuation$1.run(AsyncContinuation.java:875)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:599)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:534)
at java.lang.Thread.run(Thread.java:679)
这是 Jetty 端点:
package server;
import java.io.*;
import java.nio.channels.Selector;
import java.text.MessageFormat;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.continuation.*;
public class APIRequestHandler extends HttpServlet
{
private static final long serialVersionUID = -446741424265391138L;
private ConcurrentHashMap<String, clientconnData> key2clientconn;
private ConcurrentLinkedQueue<APIMessage> queue;
private ConcurrentHashMap<String, String> collector;
private int timeout;
private Logger logger;
private Selector selector;
public APIRequestHandler(Logger logger, Selector selector, ConcurrentLinkedQueue<APIMessage> queue,
ConcurrentHashMap<String, String> collector, ConcurrentHashMap<String, clientconnData> key2clientconn2, int timeout)
{
this.logger = logger;
this.selector = selector;
this.queue = queue;
this.collector = collector;
this.timeout = timeout;
this.key2clientconn = key2clientconn2;
}
public void doGet(HttpServletRequest req, HttpServletResponse res)
throws java.io.IOException
{
//System.out.println("request URI=" + req.getRequestURI());
String rid = req.getParameter("rid");
String clientconnId = req.getParameter("id");
String msg = req.getParameter("json");
// We do a null check to avoid the favicon call and other broken calls
if(clientconnId==null || rid==null || msg==null){
PrintWriter out = res.getWriter();
res.setStatus(200);
res.setContentType("application/json");
out.println("{");
out.println("\"response\":{\"success\":\"false\",\"response\":\"request missing parameters\"}");
out.println("}");
out.close();
}
else if(!this.key2clientconn.containsKey(clientconnId))
{
PrintWriter out = res.getWriter();
res.setStatus(200);
res.setContentType("application/json");
out.println("{");
out.println("\"rid\":"+"\""+rid+"\"");
out.println(",");
out.println("\"rtime\":"+0); // milliseconds
out.println(",");
out.println("\"response\":{\"success\":\"false\",\"response\":\"clientconn with not found on server\"}");
out.println("}");
out.close();
}
else// everything is fine, proceed as normal
{
logger.log(Level.INFO,"From API: "+msg);
// Send to channel
APIMessage m = new APIMessage();
m.rid = rid;
m.id = clientconnId;
m.json = msg + "\r\n";
// create the async context, otherwise getAsyncContext() will be null
final AsyncContext ctx = req.startAsync();
// set the timeout
ctx.setTimeout(this.timeout);
ctx.getRequest().setAttribute("rid",rid);
ctx.getRequest().setAttribute("startTime",System.currentTimeMillis()); // start time
// set up selector/queue
queue.add(m);
this.selector.wakeup();
// attach listener to respond to lifecycle events of this AsyncContext
ctx.addListener(new AsyncListener() {
public void onComplete(AsyncEvent event) throws IOException {
log("onComplete called");
// NOTE: errors and timeouts are handled with other events.
// Successful comm with clientconn handled by AsyncHTTPRequestProcessor.
ServletRequest asyncContextReq = event.getAsyncContext().getRequest();
int httpStatusCode = 500;
if(asyncContextReq.getAttribute("status") != null)
httpStatusCode = (Integer) asyncContextReq.getAttribute("status");
Object response = "{}";
if(asyncContextReq.getAttribute("response") != null)
response = event.getAsyncContext().getRequest().getAttribute("response");
long startTime = (Long) asyncContextReq.getAttribute("startTime");
long endTime = System.currentTimeMillis();
long elapsedTime = endTime - startTime;
// Do the response
HttpServletResponse ctxRes = (HttpServletResponse)event.getAsyncContext().getResponse();
ctxRes.setStatus(httpStatusCode);
ctxRes.setContentType("application/json");
ctxRes.getWriter().println("{");
ctxRes.getWriter().println("\"rid\":"+"\""+asyncContextReq.getParameter("rid")+"\"");
ctxRes.getWriter().println(",");
ctxRes.getWriter().println("\"rtime\":"+elapsedTime); // milliseconds
ctxRes.getWriter().println(",");
ctxRes.getWriter().println("\"response\": "+response);
ctxRes.getWriter().println("}");
ctxRes.getWriter().flush();
}
public void onTimeout(AsyncEvent event) throws IOException {
log("onTimeout called");
// gateway timeout (on clientconn request)
ServletRequest asyncContextReq = event.getAsyncContext().getRequest();
asyncContextReq.setAttribute("status",500); // request timeout
asyncContextReq.setAttribute("response", "{}");
}
public void onError(AsyncEvent event) throws IOException {
log("onError called");
ServletRequest asyncContextReq = event.getAsyncContext().getRequest();
asyncContextReq.setAttribute("status",500); // request timeout
asyncContextReq.setAttribute("response", "{}");
}
public void onStartAsync(AsyncEvent event) throws IOException {
log("onStartAsync called");
}
});
// spawn some task in a background thread
ctx.start(new AsyncHTTPRequestProcessor(ctx,collector,logger));
}
}
}
处理异步响应:
package server;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.Date;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;
import javax.servlet.AsyncContext;
import org.eclipse.jetty.server.AsyncContinuation;
public class AsyncHTTPRequestProcessor implements Runnable {
private ConcurrentHashMap<String, String> collector;
private Logger logger;
private AsyncContext ctx;
private String responseStr = null;
public AsyncHTTPRequestProcessor(AsyncContext _ctx,
ConcurrentHashMap<String, String> _collector, Logger _logger) {
ctx = _ctx;
collector = _collector;
logger = _logger;
}
@Override
public void run() {
logger.info("AsyncContinuation start");
//if(!((AsyncContinuation)ctx).isInitial()){
String rid = (String) ctx.getRequest().getAttribute("rid");
int elapsed = 0;
if(rid !=null)
{
logger.info("AsyncContinuation rid="+rid);
while(elapsed<ctx.getTimeout())
{
if(collector.containsKey(rid)){
responseStr = collector.get(rid);
collector.remove(rid);
logger.info("--->API http request in collector:"+responseStr);
ctx.getRequest().setAttribute("status",200);
ctx.getRequest().setAttribute("response", responseStr);
ctx.getRequest().setAttribute("endTime",System.currentTimeMillis());
break;
}
try {
Thread.sleep(10);
elapsed+=10;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//}
logger.info("Collector in async stuff:");
for(String key:collector.keySet()){
logger.info(key+"->"+collector.get(key));
}
for(Entry<String, String> x:collector.entrySet()){
logger.info(x.getKey()+"->"+x.getValue());
}
ctx.complete();
}
}
}