50

问题描述

Servlet-3.0 API 允许分离请求/响应上下文并在稍后回复它。

但是,如果我尝试编写大量数据,例如:

AsyncContext ac = getWaitingContext() ;
ServletOutputStream out = ac.getResponse().getOutputStream();
out.print(some_big_data);
out.flush()

对于 Tomcat 7 和 Jetty 8,它实际上可能会阻塞——而且它确实会阻塞在琐碎的测试用例中。教程建议创建一个可以处理这种设置的线程池——这通常与传统的 10K 架构相反。

但是,如果我有 10,000 个打开的连接和一个线程池,比如说 10 个线程,那么即使 1% 的客户端具有低速连接或只是阻塞连接,也足以阻塞线程池并完全阻塞彗星响应或减慢它显著地。

预期的做法是获得“写就绪”通知或 I/O 完成通知,然后继续推送数据。

如何使用 Servlet-3.0 API 来完成,即我如何获得:

  • I/O 操作的异步完成通知。
  • 通过写就绪通知获取非阻塞 I/O。

如果 Servlet-3.0 API 不支持,是否有任何 Web 服务器特定的 API(如 Jetty Continuation 或 Tomcat CometEvent)允许真正异步处理此类事件,而无需使用线程池伪造异步 I/O。

有人知道吗?

如果这不可能,您可以参考文档来确认吗?

示例代码中的问题演示

我附上了下面模拟事件流的代码。

笔记:

  • 它使用ServletOutputStream该 throwsIOException来检测断开连接的客户端
  • 它发送keep-alive消息以确保客户端仍然存在
  • 我创建了一个线程池来“模拟”异步操作。

在这样的示例中,我明确定义了大小为 1 的线程池来显示问题:

  • 启动应用程序
  • 从两个终端运行curl http://localhost:8080/path/to/app(两次)
  • 现在发送数据curd -d m=message http://localhost:8080/path/to/app
  • 两个客户端都收到了数据
  • 现在暂停其中一个客户端 (Ctrl+Z) 并再次发送消息curd -d m=message http://localhost:8080/path/to/app
  • 观察到另一个未挂起的客户端要么什么也没收到,要么在传输消息后停止接收保持活动的请求,因为其他线程被阻塞了。

我想在不使用线程池的情况下解决这样的问题,因为使用 1000-5000 个打开的连接我可以非常快地耗尽线程池。

下面的示例代码。


import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;

import javax.servlet.AsyncContext;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.ServletOutputStream;


@WebServlet(urlPatterns = "", asyncSupported = true)
public class HugeStreamWithThreads extends HttpServlet {

    private long id = 0;
    private String message = "";
    private final ThreadPoolExecutor pool = 
        new ThreadPoolExecutor(1, 1, 50000L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
        // it is explicitly small for demonstration purpose

    private final Thread timer = new Thread(new Runnable() {
        public void run()
        {
            try {
                while(true) {
                    Thread.sleep(1000);
                    sendKeepAlive();
                }
            }
            catch(InterruptedException e) {
                // exit
            }
        }
    });


    class RunJob implements Runnable {
        volatile long lastUpdate = System.nanoTime();
        long id = 0;
        AsyncContext ac;
        RunJob(AsyncContext ac) 
        {
            this.ac = ac;
        }
        public void keepAlive()
        {
            if(System.nanoTime() - lastUpdate > 1000000000L)
                pool.submit(this);
        }
        String formatMessage(String msg)
        {
            StringBuilder sb = new StringBuilder();
            sb.append("id");
            sb.append(id);
            for(int i=0;i<100000;i++) {
                sb.append("data:");
                sb.append(msg);
                sb.append("\n");
            }
            sb.append("\n");
            return sb.toString();
        }
        public void run()
        {
            String message = null;
            synchronized(HugeStreamWithThreads.this) {
                if(this.id != HugeStreamWithThreads.this.id) {
                    this.id = HugeStreamWithThreads.this.id;
                    message = HugeStreamWithThreads.this.message;
                }
            }
            if(message == null)
                message = ":keep-alive\n\n";
            else
                message = formatMessage(message);

            if(!sendMessage(message))
                return;

            boolean once_again = false;
            synchronized(HugeStreamWithThreads.this) {
                if(this.id != HugeStreamWithThreads.this.id)
                    once_again = true;
            }
            if(once_again)
                pool.submit(this);

        }
        boolean sendMessage(String message) 
        {
            try {
                ServletOutputStream out = ac.getResponse().getOutputStream();
                out.print(message);
                out.flush();
                lastUpdate = System.nanoTime();
                return true;
            }
            catch(IOException e) {
                ac.complete();
                removeContext(this);
                return false;
            }
        }
    };

    private HashSet<RunJob> asyncContexts = new HashSet<RunJob>();

    @Override
    public void init(ServletConfig config) throws ServletException
    {
        super.init(config);
        timer.start();
    }
    @Override
    public void destroy()
    {
        for(;;){
            try {
                timer.interrupt();
                timer.join();
                break;
            }
            catch(InterruptedException e) {
                continue;
            }
        }
        pool.shutdown();
        super.destroy();
    }


    protected synchronized void removeContext(RunJob ac)
    {
        asyncContexts.remove(ac);
    }

    // GET method is used to establish a stream connection
    @Override
    protected synchronized void doGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {

        // Content-Type header
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("utf-8");

        // Access-Control-Allow-Origin header
        response.setHeader("Access-Control-Allow-Origin", "*");

        final AsyncContext ac = request.startAsync();

        ac.setTimeout(0);
        RunJob job = new RunJob(ac);
        asyncContexts.add(job);
        if(id!=0) {
            pool.submit(job);
        }
    }

    private synchronized void sendKeepAlive()
    {
        for(RunJob job : asyncContexts) {
            job.keepAlive();
        }
    }

    // POST method is used to communicate with the server
    @Override
    protected synchronized void doPost(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException 
    {
        request.setCharacterEncoding("utf-8");
        id++;
        message = request.getParameter("m");        
        for(RunJob job : asyncContexts) {
            pool.submit(job);
        }
    }


}

上面的示例使用线程来防止阻塞......但是,如果阻塞客户端的数量大于线程池的大小,它将阻塞。

如何在没有阻塞的情况下实现它?

4

6 回答 6

29

我发现Servlet 3.0 AsynchronousAPI 很难正确实现,有用的文档很少。经过大量试验和错误并尝试了许多不同的方法,我能够找到一个我非常满意的强大解决方案。当我查看我的代码并将其与您的代码进行比较时,我注意到一个可能会帮助您解决特定问题的主要区别。我使用 aServletResponse来写入数据,而不是 a ServletOutputStream

在这里,我的首选异步 Servlet 类针对您的some_big_data情况稍作调整:

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebInitParam;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;

import org.apache.log4j.Logger;

@javax.servlet.annotation.WebServlet(urlPatterns = { "/async" }, asyncSupported = true, initParams = { @WebInitParam(name = "threadpoolsize", value = "100") })
public class AsyncServlet extends HttpServlet {

  private static final Logger logger = Logger.getLogger(AsyncServlet.class);

  public static final int CALLBACK_TIMEOUT = 10000; // ms

  /** executor service */
  private ExecutorService exec;

  @Override
  public void init(ServletConfig config) throws ServletException {

    super.init(config);
    int size = Integer.parseInt(getInitParameter("threadpoolsize"));
    exec = Executors.newFixedThreadPool(size);
  }

  @Override
  public void service(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {

    final AsyncContext ctx = req.startAsync();
    final HttpSession session = req.getSession();

    // set the timeout
    ctx.setTimeout(CALLBACK_TIMEOUT);

    // attach listener to respond to lifecycle events of this AsyncContext
    ctx.addListener(new AsyncListener() {

      @Override
      public void onComplete(AsyncEvent event) throws IOException {

        logger.info("onComplete called");
      }

      @Override
      public void onTimeout(AsyncEvent event) throws IOException {

        logger.info("onTimeout called");
      }

      @Override
      public void onError(AsyncEvent event) throws IOException {

        logger.info("onError called: " + event.toString());
      }

      @Override
      public void onStartAsync(AsyncEvent event) throws IOException {

        logger.info("onStartAsync called");
      }
    });

    enqueLongRunningTask(ctx, session);
  }

  /**
   * if something goes wrong in the task, it simply causes timeout condition that causes the async context listener to be invoked (after the fact)
   * <p/>
   * if the {@link AsyncContext#getResponse()} is null, that means this context has already timed out (and context listener has been invoked).
   */
  private void enqueLongRunningTask(final AsyncContext ctx, final HttpSession session) {

    exec.execute(new Runnable() {

      @Override
      public void run() {

        String some_big_data = getSomeBigData();

        try {

          ServletResponse response = ctx.getResponse();
          if (response != null) {
            response.getWriter().write(some_big_data);
            ctx.complete();
          } else {
            throw new IllegalStateException(); // this is caught below
          }
        } catch (IllegalStateException ex) {
          logger.error("Request object from context is null! (nothing to worry about.)"); // just means the context was already timeout, timeout listener already called.
        } catch (Exception e) {
          logger.error("ERROR IN AsyncServlet", e);
        }
      }
    });
  }

  /** destroy the executor */
  @Override
  public void destroy() {

    exec.shutdown();
  }
}
于 2012-08-28T11:17:05.310 回答
10

在我研究这个主题的过程中,这个线程不断弹出,所以我想在这里提到它:

ServletInputStreamServlet 3.1 在和上引入了异步操作ServletOutputStream。见ServletOutputStream.setWriteListener

可以在http://docs.oracle.com/javaee/7/tutorial/servlets013.htm找到一个示例

于 2013-09-07T11:13:33.537 回答
3

这可能会有所帮助

http://www.oracle.com/webfolder/technetwork/tutorials/obe/java/async-servlet/async-servlets.html

于 2012-09-25T23:18:15.207 回答
3

我们不能完全导致写入是异步的。实际上,我们必须忍受这样的限制,即当我们向客户写出一些东西时,我们希望能够迅速这样做,并且如果我们不这样做,就能够将其视为错误。也就是说,如果我们的目标是尽可能快地将数据流式传输到客户端,并使用通道的阻塞/非阻塞状态作为控制流量的一种方式,那么我们就不走运了。但是,如果我们以客户端应该能够处理的低速率发送数据,我们至少能够及时断开读取速度不够快的客户端。

例如,在您的应用程序中,我们以较慢的速度(每隔几秒)发送 keepalive,并希望客户端能够跟上他们正在发送的所有事件。我们将数据挥霍给客户端,如果跟不上,我们可以及时干净地断开连接。这比真正的异步 I/O 更受限制,但它应该满足您的需要(顺便说一下,我的)。

诀窍是,所有用于写出输出的方法,这些方法只是抛出 IOExceptions 实际上比这多一点:在实现中,所有对可以被中断()的东西的调用都将用这样的东西包装(取自码头 9):

catch (InterruptedException x)
    throw (IOException)new InterruptedIOException().initCause(x);

(我还注意到这在 Jetty 8中不会发生,其中记录了 InterruptedException 并立即重试阻塞循环。大概您确保您的 servlet 容器表现良好以使用此技巧。)

也就是说,当慢速客户端导致写入线程阻塞时,我们只需通过在线程上调用 interrupt() 来强制将写入作为 IOException 抛出。想一想:非阻塞代码无论如何都会在我们的一个处理线程上消耗一个时间单位来执行,因此使用刚刚中止的阻塞写入(比如说一毫秒之后)在原则上实际上是相同的。我们仍然只是在线程上消耗很短的时间,只是效率稍低。

我已经修改了您的代码,以便主计时器线程在我们开始写入之前运行一个作业以限制每次写入的时间,并且如果写入快速完成,则该作业将被取消,这是应该的。

最后一点:在一个实现良好的 servlet 容器中,导致 I/O 被抛出应该是安全的。如果我们能捕获 InterruptedIOException 并稍后再次尝试写入,那就太好了。如果慢速客户端跟不上完整的流,我们可能想给他们事件的一个子集。据我所知,在 Jetty 中这并不完全安全。如果写入抛出,HttpResponse 对象的内部状态可能不够一致,无法处理以后安全地重新进入写入。我认为尝试以这种方式推送 servlet 容器是不明智的,除非我错过了提供此保证的特定文档。我认为这个想法是,如果发生 IOException,连接被设计为关闭。

下面是代码,修改了 RunJob::run() 版本,使用了一个非常简单的说明(实际上,我们希望在这里使用主计时器线程,而不是每次写入都启动一个,这很愚蠢)。

public void run()
{
    String message = null;
    synchronized(HugeStreamWithThreads.this) {
        if(this.id != HugeStreamWithThreads.this.id) {
            this.id = HugeStreamWithThreads.this.id;
            message = HugeStreamWithThreads.this.message;
        }
    }
    if(message == null)
        message = ":keep-alive\n\n";
    else
        message = formatMessage(message);

    final Thread curr = Thread.currentThread();
    Thread canceller = new Thread(new Runnable() {
        public void run()
        {
            try {
                Thread.sleep(2000);
                curr.interrupt();
            }
            catch(InterruptedException e) {
                // exit
            }
        }
    });
    canceller.start();

    try {
        if(!sendMessage(message))
            return;
    } finally {
        canceller.interrupt();
        while (true) {
            try { canceller.join(); break; }
            catch (InterruptedException e) { }
        }
    }

    boolean once_again = false;
    synchronized(HugeStreamWithThreads.this) {
        if(this.id != HugeStreamWithThreads.this.id)
            once_again = true;
    }
    if(once_again)
        pool.submit(this);

}
于 2013-04-30T23:49:32.820 回答
2

春天适合你吗?Spring-MVC 3.2 有一个名为 的类DeferredResult,它将优雅地处理您的“10,000 个打开连接/10 个服务器池线程”场景。

示例:http: //blog.springsource.org/2012/05/06/spring-mvc-3-2-preview-introducing-servlet-3-async-support/

于 2012-08-30T16:32:10.083 回答
-1

我快速浏览了您的列表,所以我可能遗漏了一些要点。池线程的优点是随着时间的推移在多个任务之间共享线程资源。也许您可以通过间隔不同http连接的keepAlive响应来解决您的问题,而不是同时将所有这些响应分组。

于 2013-02-28T22:23:45.057 回答