1

在我们的异步 servlet 中修复同步问题后,我们仍然很少见

java.io.IOException: Closed while Pending/Unready

来自码头的警告。通过上面的修复,在我们的生产系统中,它从每天约 90 次减少到每天约 5 次。这种情况很少见,而且看起来好多了,但可能仍然缺少一些小东西。

完整的堆栈跟踪:

[jetty-63523] (HttpOutput.java:287)   - 
java.io.IOException: Closed while Pending/Unready
        at org.eclipse.jetty.server.HttpOutput.close(HttpOutput.java:285)
        at org.eclipse.jetty.server.Response.closeOutput(Response.java:1044)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:493)
        at org.eclipse.jetty.server.HttpChannel.run(HttpChannel.java:293)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680)
        at java.lang.Thread.run(Thread.java:748)

写入输出流(不同步)的唯一代码如下setContentType()setStatus()flushBuffer()在此处调用:

@Override
protected void doPost(final HttpServletRequest req, final HttpServletResponse resp)
        throws ServletException, IOException {
    resp.setContentType(MediaType.OCTET_STREAM.type());
    resp.setStatus(HttpServletResponse.SC_OK);
    resp.setBufferSize(4096);
    resp.flushBuffer();
    final AsyncContext async = req.startAsync();
    async.setTimeout(5_000); // millis
    final ServletOutputStream output = resp.getOutputStream();
    final QueueWriteListener writeListener = new QueueWriteListener(async, output);
    async.addListener(writeListener);
    output.setWriteListener(writeListener);
}

这在我们QueueWriteListener的设置之前运行,所以如果flushBuffer()是同步的,它应该不是问题。

无论如何,查找 Jetty 的源代码,Reponse.flush()调用HttpOutput.flush()其中调用new AsyncFlush().iterate()似乎可疑但调试doPost()/flushBuffer()这个case分支没有运行。

完整代码:

@Override
protected void doPost(final HttpServletRequest req, final HttpServletResponse resp)
        throws ServletException, IOException {
    resp.setContentType(MediaType.OCTET_STREAM.type());
    resp.setStatus(HttpServletResponse.SC_OK);
    resp.setBufferSize(4096);
    resp.flushBuffer();
    final AsyncContext async = req.startAsync();
    async.setTimeout(5_000); // millis
    final ServletOutputStream output = resp.getOutputStream();
    final QueueWriteListener writeListener = new QueueWriteListener(async, output);
    async.addListener(writeListener);
    output.setWriteListener(writeListener);
}

private static class QueueWriteListener implements AsyncListener, WriteListener {

    private static final Logger logger = LoggerFactory.getLogger(QueueWriteListener.class);

    private final AsyncContext asyncContext;
    private final ServletOutputStream output;

    @GuardedBy("this")
    private boolean completed = false;

    public QueueWriteListener(final AsyncContext asyncContext, final ServletOutputStream output) {
        this.asyncContext = checkNotNull(asyncContext, "asyncContext cannot be null");
        this.output = checkNotNull(output, "output cannot be null");
    }

    @Override
    public void onWritePossible() throws IOException {
        writeImpl();
    }

    private synchronized void writeImpl() throws IOException {
        if (completed) {
            return;
        }
        while (output.isReady()) {
            final byte[] message = getNextMessage();
            if (message == null) {
                output.flush();
                return;
            }
            output.write(message);
        }
    }

    private synchronized void completeImpl() {
        // also stops DataFeederThread to call bufferArrived
        completed = true;
        asyncContext.complete();
    }

    @Override
    public void onError(final Throwable t) {
        logger.error("Writer.onError", t);
        completeImpl();
    }

    public void dataArrived() {
        try {
            writeImpl();
        } catch (RuntimeException | IOException e) {
            ...
        }
    }

    public void noMoreData() {
        completeImpl();
    }

    @Override
    public synchronized void onComplete(final AsyncEvent event) throws IOException {
        completed = true; // might not needed but does not hurt
    }

    @Override
    public synchronized void onTimeout(final AsyncEvent event) throws IOException {
        completeImpl();
    }

    @Override
    public void onError(final AsyncEvent event) throws IOException {
        logger.error("onError", event.getThrowable());
    }

    ...
}

因此,似乎在 Jetty 完成和(异步)关闭输出之间,没有人可以写入输出,因此其状态不应更改为挂起或未就绪。尽管如此,它仍然以某种方式发生。Closed while Pending/Unready此警告的原因可能是什么?

我检查了我们的日志(没有相关的)。

onError(AsyncEvent event)(还)不在synchronized我们的代码中,但它不相关,因为它的日志消息从未出现在我们的日志中。

GitHub相关讨论:https ://github.com/eclipse/jetty.project/issues/2689

4

1 回答 1

0

我能够使用以下代码重现警告:

@Override
protected void doPost(final HttpServletRequest req, final HttpServletResponse resp)
        throws ServletException, IOException {
    resp.setContentType(MediaType.OCTET_STREAM.type());
    resp.setStatus(HttpServletResponse.SC_OK);
    resp.setBufferSize(8192);
    resp.flushBuffer();
    final AsyncContext async = req.startAsync();
    async.setTimeout(10_000); // millis
    final ServletOutputStream output = resp.getOutputStream();
    final QueueWriteListener writeListener = new QueueWriteListener(output);
    async.addListener(writeListener);
    output.setWriteListener(writeListener);
}

private static class QueueWriteListener implements AsyncListener, WriteListener {

    private static final Logger logger = LoggerFactory.getLogger(QueueWriteListener.class);

    private final ServletOutputStream output;

    public QueueWriteListener(final ServletOutputStream output) {
        this.output = checkNotNull(output, "output cannot be null");
    }

    @Override
    public void onWritePossible() throws IOException {
        logger.info("onWritePossible()");
        long written = 0;
        while (output.isReady()) {
            final byte[] data = new byte[8 * 1024 * 1024];
            Arrays.fill(data, (byte) 'W');
            output.write(data);
            written += data.length;
            logger.info("write OK, written: {} KB", written / 1024);
        }
        logger.info("onWritePossible() end");
    }

    @Override
    public void onError(final Throwable t) {
        logger.info("Writer.onError -Error: {}, Message: {}", t.getClass().getName(), t.getMessage());
    }

    @Override
    public void onComplete(final AsyncEvent event) throws IOException {
        logger.warn("onComplete: {}", event);
    }

    @Override
    public void onTimeout(final AsyncEvent event) throws IOException {
        logger.warn("onTimeout()");
    }

    @Override
    public void onError(final AsyncEvent event) throws IOException {
        logger.error("onError: {}", event, event.getThrowable());
    }

    @Override
    public void onStartAsync(final AsyncEvent event) throws IOException {
        logger.info("onStartAsync: {}", event);
    }
}

并且curl客户端速度较慢:

curl --limit 16 -XPOST http://localhost:35419/example2

结果:

10:39:29,063  INFO [jetty-16] {Example2Servlet.java:52} - onWritePossible()
10:39:29,084  INFO [jetty-16] {Example2Servlet.java:59} - write OK, written: 8192 KB
10:39:29,084  INFO [jetty-16] {Example2Servlet.java:61} - onWritePossible() end
10:39:39,085  WARN [jetty-17] {Example2Servlet.java:76} - onTimeout()
10:39:39,088  WARN [jetty-17] {HttpOutput.java:286} - java.io.IOException: Closed while Pending/Unready, requestUrl=http://localhost:35419/example2
10:39:39,090  INFO [jetty-17] {HttpOutput.java:287} - 
java.io.IOException: Closed while Pending/Unready, requestUrl=http://localhost:35419/example2
    at org.eclipse.jetty.server.HttpOutput.close(HttpOutput.java:285)
    at org.eclipse.jetty.server.Response.closeOutput(Response.java:1044)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:493)
    at org.eclipse.jetty.server.HttpChannel.run(HttpChannel.java:293)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680)
    at java.lang.Thread.run(Thread.java:748)
10:39:39,091  WARN [jetty-17] {Example2Servlet.java:71} - onComplete: org.eclipse.jetty.server.AsyncContextEvent@3b51901

注意事项:

  • 它不使用任何第三方线程。
  • 在原始代码中有一个complete()调用 in onTimeout(),但它不影响行为,所以我将其删除。
  • 我还synchronized从方法声明中删除了关键字——它们不会影响行为。
  • 我能够使用不同的缓冲区和数组大小重现相同的行为,警告迟早会出现。(例如,8192 字节的缓冲区大小和 1024 字节的数组大小也会重现警告,但需要更多时间。)这通常会导致额外的调试日志,如下所示:

    DEBUG [jetty-12] {HttpOutput.java:1271} - EOF of org.eclipse.jetty.server.HttpOutput$AsyncWrite@4b8dff34[PROCESSING]
    
于 2018-08-01T09:13:56.687 回答