2

我们有一个异步 servlet,它从 Jetty 生成以下警告日志:

java.io.IOException: Closed while Pending/Unready

启用调试日志后,我得到以下堆栈跟踪:

WARN [jetty-25948] (HttpOutput.java:278)   - java.io.IOException: Closed while Pending/Unready
DEBUG [jetty-25948] (HttpOutput.java:279)   - 
java.io.IOException: Closed while Pending/Unready
        at org.eclipse.jetty.server.HttpOutput.close(HttpOutput.java:277) ~[jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.Response.closeOutput(Response.java:1044) [jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:488) [jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpChannel.run(HttpChannel.java:293) [jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:708) [jetty-util.jar:9.4.8.v20171121]
        at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:626) [jetty-util.jar:9.4.8.v20171121]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]

它没有太大帮助。

只有在 Jetty 调用了onTimeout()我们的AsyncListener. QA 有时可以通过kill -9在客户端应用程序上使用来重现它。

如何使用示例 servlet 客户端代码重现此警告?我想在比我们的生产代码更简单的环境中理解这个问题,以便之后能够修复生产代码。示例 servlet 应该如何表现?是否可以在同一个 JUnit 测试中使用 Apache Commons HttpClient 客户端重现它?(这对于编写没有复杂网络黑客攻击的集成测试非常有用,例如kill -9。)

我尝试了一些方法来实现示例异步 servlet 和客户端,但均未成功。我认为附加此代码不会有太大帮助,但如果有人感兴趣,我可以这样做。

码头版本:9.4.8.v20171121

更新(2018-06-27):

考虑到@Joakim Erdfelt 的有用回答,我没有close()在我们的代码中找到任何调用,但发现了可疑的同步丢失。这是我们的异步轮询 servlet 的基础:

public class QueuePollServlet extends HttpServlet {

    public QueuePollServlet() {
    }

    @Override
    protected void doPost(final HttpServletRequest req, final HttpServletResponse resp)
            throws ServletException, IOException {
        resp.setContentType(MediaType.OCTET_STREAM.type());
        resp.setStatus(HttpServletResponse.SC_OK);
        resp.flushBuffer();
        final AsyncContext async = req.startAsync();
        async.setTimeout(30_000);
        final ServletOutputStream output = resp.getOutputStream();
        final QueueWriteListener writeListener = new QueueWriteListener(async, output);
        async.addListener(writeListener);
        output.setWriteListener(writeListener);
    }

    private static class QueueWriteListener implements AsyncListener, WriteListener {

        private final AsyncContext asyncContext;
        private final ServletOutputStream output;

        public QueueWriteListener(final AsyncContext asyncContext, final ServletOutputStream output) {
            this.asyncContext = checkNotNull(asyncContext, "asyncContext cannot be null");
            this.output = checkNotNull(output, "output cannot be null");
        }

        @Override
        public void onWritePossible() throws IOException {
            writeImpl();
        }

        private synchronized void writeImpl() throws IOException {
            while (output.isReady()) {
                final byte[] message = getNextMessage();
                if (message == null) {
                    output.flush();
                    return;
                }
                output.write(message);
            }
        }

        private void completeImpl() {
            asyncContext.complete();
        }

        public void dataArrived() {
            try {
                writeImpl();
            } catch (IOException e) {
                ...
            }
        }

        public void noMoreBuffers() {
            completeImpl();
        }

        @Override
        public void onTimeout(final AsyncEvent event) throws IOException {
            completeImpl();
        }

        @Override
        public void onError(final Throwable t) {
            logger.error("Writer.onError", t);
            completeImpl();
        }


        ...
    }
}

可能的竞争条件:

  1. DataFeederThread: 调用dataArrived()-> writeImpl(),然后它得到的output.isReady()true.
  2. onTimeout()完成上下文的Jetty 调用。
  3. DataFeederThread:output.write()在 while 循环中调用但找到了完整的上下文。

这种情况会导致Closed while Pending/Unready警告还是另一个问题?我是对的,制造completeImpl() synchronized解决了问题还是有其他需要关心的事情?

更新(2018-06-28):

我们在以下代码段中也有类似的onError实现:QueueWriteListener

@Override
public void onError(final Throwable t) {
    logger.error("Writer.onError", t);
    completeImpl();
}

onError无论如何,日志消息周围没有错误日志Closed while Pending/Unready(查看每个两个小时的时间范围),只有 EOF,如下面的来自我们的DataFeederThread

DEBUG [DataFeederThread] (HttpOutput.java:224) - 
org.eclipse.jetty.io.EofException: null
        at org.eclipse.jetty.server.HttpConnection$SendCallback.reset(HttpConnection.java:704) ~[jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpConnection$SendCallback.access$300(HttpConnection.java:668) ~[jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpConnection.send(HttpConnection.java:526) ~[jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpChannel.sendResponse(HttpChannel.java:778) ~[jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpChannel.write(HttpChannel.java:834) ~[jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:234) [jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:218) [jetty-server.jar:9.4.8.v20171121]
        at org.eclipse.jetty.server.HttpOutput.flush(HttpOutput.java:392) [jetty-server.jar:9.4.8.v20171121]
        at com.example.QueuePollServlet$QueueWriteListener.writeImpl()
        at com.example.QueuePollServlet$QueueWriteListener.dataArrived()


DEBUG [DataFeederThread] (QueuePollServlet.java:217) - messageArrived exception
org.eclipse.jetty.io.EofException: Closed
        at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:476) ~[jetty-server.jar:9.4.8.v20171121]
        at com.example.QueuePollServlet$QueueWriteListener.writeImpl()
        at com.example.QueuePollServlet$QueueWriteListener.dataArrived()
4

2 回答 2

3

Closed while Pending/Unready可以通过一些手动调试和普通客户端来重现警告curl。我已经使用 Jetty 9.4.8.v20171121 和 Jetty 9.4.11.v20180605 对其进行了测试。您需要两个断点,因此在测试环境中可靠地重现并不容易。

  1. 第一个断点位于HttpOutput.write()方法中,在它的状态从 更改READYPENDING之后:

    case READY:
        if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
            continue;
        // put a breakpoint here
    
  2. 第二个在Response.closeOutput()

    case STREAM:
        // put a breakpiont here
        if (!_out.isClosed())
            getOutputStream().close();
        break;
    

重现步骤:

  1. [JettyThread1]QueueWriteListener.onWritePossible()将几个字节写入输出然后返回的调用(因为其输入缓冲区为空)。
  2. 等待 onTimeout 事件。
  3. [JettyThread2]HttpChannelState.onTimeout()调用QueueWriteListener.onTimeout()哪个调用asyncContext.complete().
  4. [JettyThread2]HttpChannelState.onTimeout()在异步超时后安排调度。
  5. [JettyThread2] 在第二个断点处暂停
  6. [DFT]DataFeederThread调用writeImpl()
  7. [DFT]DataFeederThread调用HttpOutput.write()(输出流)
  8. [DFT]HttpOutput.write()将其状态从 更改READYPENDING
  9. [DFT]DataFeederThread在这里暂停,由于上面的断点
  10. [JettyThread2] 调度调度关闭输出流并产生Closed while Pending/Unready警告。

所以,实际上是 Jetty 关闭了这个(Jetty 9.4.8.v20171121)堆栈上的输出流:

Thread [jetty-19] (Suspended)   
    Response.closeOutput() line: 1043   
    HttpChannelOverHttp(HttpChannel).handle() line: 488 
    HttpChannelOverHttp(HttpChannel).run() line: 293    
    QueuedThreadPool.runJob(Runnable) line: 708 
    QueuedThreadPool$2.run() line: 626  
    Thread.run() line: 748  

在侦听器中制作onTimeout() synchronized(以及writeImpl()也是synchronized)无济于事,因为预定的关闭仍然能够与writeImpl(从DataFeederThread)重叠。考虑这种情况:

  1. [JettyThread1]QueueWriteListener.onWritePossible()将几个字节写入输出然后返回的调用(因为其输入缓冲区为空)。
  2. 等待 onTimeout 事件。
  3. [JettyThread2]HttpChannelState.onTimeout()调用QueueWriteListener.onTimeout()哪个调用asyncContext.complete().
  4. [DFT]DataFeederThread调用writeImpl()(由于onTimeout尚未完成而被阻止)
  5. [JettyThread2]QueueWriteListener.onTimeout()完成
  6. [DFT]writeImpl()可以运行
  7. [JettyThread2]HttpChannelState.onTimeout()在异步超时后安排调度。
  8. [JettyThread2] 在第二个断点处暂停
  9. [DFT]DataFeederThread调用HttpOutput.write()(输出流)
  10. [DFT]HttpOutput.write()将其状态从 更改READYPENDING
  11. [DFT]DataFeederThread在这里暂停,由于上面的断点
  12. [JettyThread2] 调度调度关闭输出流并产生Closed while Pending/Unready警告。

不幸的是,asnyContext.complete()检查之后是不够的output.isReady()。它在trueJetty 重新打开后返回HttpOutput(请参见下面的堆栈),因此您需要在侦听器中为此设置一个单独的标志。

Thread [jetty-13] (Suspended (access of field _state in HttpOutput))    
    HttpOutput.reopen() line: 195   
    HttpOutput.recycle() line: 923  
    Response.recycle() line: 138    
    HttpChannelOverHttp(HttpChannel).recycle() line: 269    
    HttpChannelOverHttp.recycle() line: 83  
    HttpConnection.onCompleted() line: 424  
    HttpChannelOverHttp(HttpChannel).onCompleted() line: 695    
    HttpChannelOverHttp(HttpChannel).handle() line: 493 
    HttpChannelOverHttp(HttpChannel).run() line: 293    
    QueuedThreadPool.runJob(Runnable) line: 708 
    QueuedThreadPool$2.run() line: 626  
    Thread.run() line: 748  

此外,当输出仍然关闭时(回收/重新打开之前)isReady()也会返回。true相关问题:isReady() 在关闭状态下返回 true - 为什么?

最终的实现是类似的:

@Override
protected void doPost(final HttpServletRequest req, final HttpServletResponse resp)
        throws ServletException, IOException {
    resp.setContentType(MediaType.OCTET_STREAM.type());
    resp.setStatus(HttpServletResponse.SC_OK);
    resp.setBufferSize(4096);
    resp.flushBuffer();
    final AsyncContext async = req.startAsync();
    async.setTimeout(5_000); // millis
    final ServletOutputStream output = resp.getOutputStream();
    final QueueWriteListener writeListener = new QueueWriteListener(async, output);
    async.addListener(writeListener);
    output.setWriteListener(writeListener);
}

private static class QueueWriteListener implements AsyncListener, WriteListener {

    private static final Logger logger = LoggerFactory.getLogger(QueueWriteListener.class);

    private final AsyncContext asyncContext;
    private final ServletOutputStream output;

    @GuardedBy("this")
    private boolean completed = false;

    public QueueWriteListener(final AsyncContext asyncContext, final ServletOutputStream output) {
        this.asyncContext = checkNotNull(asyncContext, "asyncContext cannot be null");
        this.output = checkNotNull(output, "output cannot be null");
    }

    @Override
    public void onWritePossible() throws IOException {
        writeImpl();
    }

    private synchronized void writeImpl() throws IOException {
        if (completed) {
            return;
        }
        while (output.isReady()) {
            final byte[] message = getNextMessage();
            if (message == null) {
                output.flush();
                return;
            }
            output.write(message);
        }
    }

    private synchronized void completeImpl() {
        // also stops DataFeederThread to call bufferArrived
        completed = true;
        asyncContext.complete();
    }

    @Override
    public void onError(final Throwable t) {
        logger.error("Writer.onError", t);
        completeImpl();
    }

    public void dataArrived() {
        try {
            writeImpl();
        } catch (RuntimeException | IOException e) {
            ...
        }
    }

    public void noMoreData() {
        completeImpl();
    }

    @Override
    public synchronized void onComplete(final AsyncEvent event) throws IOException {
        completed = true; // might not needed but does not hurt
    }

    @Override
    public synchronized void onTimeout(final AsyncEvent event) throws IOException {
        completeImpl();
    }

    @Override
    public void onError(final AsyncEvent event) throws IOException {
        logger.error("onError", event.getThrowable());
    }

    ...
}

2018-08-01 更新:实际上它并没有完全修复警告,请参阅:来自 Jetty 的“Closed while Pending/Unready”警告

于 2018-06-29T15:17:57.170 回答
2

您正在使用 Servlet 规范中的异步 I/O 并手动关闭了响应流。

关键事实:close call 意味着写入操作。

在 Jetty 案例中,IOException 告诉您手动调用 Response 流关闭导致了不希望的副作用。

因此,在异步 I/O 模式下,ServletOutputStream.isReady()应该在使用ServletOutputStream.close()验证isReady()为真之前进行调用。

请注意,在任何时候都可能需要允许 a ServletOutputStream.close(),尤其是AsyncContext.complete()在此调度上已被调用的情况下。

但是,如果先前的写入尚未完成和/或ServletOutputStream.isReady()尚未被调用,则允许这种特定的裸写情况ServletOutputStream.close(),但行为是中止响应,丢弃ServletOutputStream.

这就是为什么你得到IOException("Closed while Pending/Unready")

现在您应该问自己为什么要关闭响应流?根据 Servlet 规范,这不是必需的,如果您想使用 将RequestDispatcher多个 Servlet 响应组合在一起,则不希望这样做。

于 2018-06-27T12:28:08.820 回答