3

我在这里读到这EntityUtils.consume(httpEntity)将导致将连接释放回连接池,但是当我查看源代码时,我无法理解这是如何发生的。有人可以指出我在使用低级Elastic Search Rest ClientEntityUtils.consume(httpEntity)时释放连接的代码部分 吗?EntityUtils.toString(httpEntity)

如果有 aSocketTimeoutException并且我不使用 ,连接会发生什么HttpEntity

4

1 回答 1

3

客户端关闭和连接释放到池(步骤)

  1. EntityUtils.consume& >如果它完全消耗实体EntityUtils.toString,第一个将close()instream第二个总是调用instream.close()它的finally子句。instream是 InputStream 变量的名称。

  2. instream. close()> 对于这个例子, 的实现InputStream是一个ContentInputStream. 该close()方法ContentInputStream通过代码片段中显示的循环机制强制读取直到结束。

    对该流的调用将导致EOF异常。

    @Override
    public void close() throws IOException 
    {
      final byte tmp[] = new byte[1024];
      /*loop until read() is -1, which means, advance the buffer till is end*/
      while (this.buffer.read(tmp, 0, tmp.length) >= 0) {}
      super.close();
    }
    
  3. Pool> 检查所有汇集的资源状态。此操作可能由某些操作(作为新请求)触发,也可能由底层线程管理。如果一个资源/流被另一端关闭,它将得到一个EOF异常(因为缓冲区被迫前进到最后)。该地点被标记为无效。

  4. Pool> 所有无效点都被回收。它将删除关闭的流并创建新的流,或者在不需要擦除+创建的情况下恢复现有流(取决于资源类型)。这意味着保持流的位置再次可用,并准备好使用新的流:

    连接被释放回池。另一端不再使用它,因此池可以完全控制它。现在允许池删除、恢复它并将其分配给另一个请求者。.


例子

让我们想象一个Pool管理 3 个资源的,例如HttpConnections. 你已经有 3 个线程在使用这个池,所以这些点都被占用了。

同时ThreadZ等待连接释放回池

 (spot1) [HttpConn1] -- ThreadA
 (spot2) [HttpConn2] -- ThreadB
 (spot3) [HttpConn3] -- ThreadC

ThreadA完成了它的工作并关闭了它的连接。当的状态为关闭Pool时会注意到这一点。不同的实现将检查这是不同的方式,其中之一是在尝试从流中读取时遇到异常。其他实现可能有不同的机制来检查资源是否已关闭。如果告诉他的资源是关闭/无效的,将回收这个点。这里有两个选项:PoolEntryPoolEntryEOFPoolEntryPool

a) 擦除和创建。

 (spot1) [HttpConn4] // the pool removes the old one and creates a new connection
 (spot2) [HttpConn2] -- ThreadB
 (spot3) [HttpConn3] -- ThreadC

b) 恢复。

 (spot1) [HttpConn1] // the pool is able to "reset" the existing resource
 (spot2) [HttpConn2] -- ThreadB
 (spot3) [HttpConn3] -- ThreadC

“释放连接”可以翻译为“现在又有一个可用的点/资源”。池现在可以连接到ThreadZ

 (spot1) [HttpConn1] -- ThreadZ
 (spot2) [HttpConn2] -- ThreadB
 (spot3) [HttpConn3] -- ThreadC

consume/ toString- 连接释放

上面所有的解释都意味着调用close() InputStream 触发连接释放

这发生在consume如果实体内容被完全消耗)和toString方法中:

public static void consume(final HttpEntity entity) throws IOException 
{ 
    if (entity == null) 
        return; 
   
    if (entity.isStreaming()) 
    { 
        InputStream instream = entity.getContent(); 
        if (instream != null) 
            instream.close();   // <-- connection release
    } 
} 

public static String toString(final HttpEntity entity, final Charset defaultCharset) 
                              throws IOException, ParseException 
{ 
    Args.notNull(entity, "Entity"); 
    InputStream instream = entity.getContent(); 
    if (instream == null) { 
        return null; 
    } 
    try { 
        Args.check(entity.getContentLength() <= Integer.MAX_VALUE,  
                "HTTP entity too large to be buffered in memory"); 
        int i = (int)entity.getContentLength(); 
        if (i < 0) { 
            i = 4096; 
        } 
        Charset charset = null; 
        try { 
            ContentType contentType = ContentType.getOrDefault(entity); 
            charset = contentType.getCharset(); 
        } catch (UnsupportedCharsetException ex) { 
            throw new UnsupportedEncodingException(ex.getMessage()); 
        } 
        if (charset == null) { 
            charset = defaultCharset; 
        } 
        if (charset == null) { 
            charset = HTTP.DEF_CONTENT_CHARSET; 
        } 
        Reader reader = new InputStreamReader(instream, charset); 
        CharArrayBuffer buffer = new CharArrayBuffer(i); 
        char[] tmp = new char[1024]; 
        int l; 
        while((l = reader.read(tmp)) != -1) { 
            buffer.append(tmp, 0, l); 
        } 
        return buffer.toString(); 
    } finally { 
        instream.close();     // <--- connection release
    } 
} 

如果存在 SocketTimeoutException 并且我不使用 HttpEntity,连接会发生什么情况?

正如您所注意到的,这两种方法都抛出一个IOException, 并SocketTimeoutException从它继承。如果发生这种情况,调用者有责任捕获此异常并设法关闭所有资源。例如:

void tryConsume()
{
   try 
   {
     //...
      EntityUtils.consume(httpEntity);
     //...
   }
   catch (IOException)
   {
     //SocketTimeoutException happened. Log the error,etc
     // (Close resources here...)
   }
   finally
   {
     //...Or maybe include a finally clause and close them here, if you wish 
     // them to be closed regardless of success/failure.
     if (httpEntity!=null)
     {
        InputStream instream = httpEntity.getContent(); 
        if (instream != null) 
            instream.close();   /* <-- connection release. when checking this 
                                 spot, the pool will get (f.e) an EOF 
                                 exception. This will lead to replacing this 
                                 resource with a fresh new connection and 
                                 setting the spot status as avaliable. */
      }
   }
}

请注意,如果SocketTimeoutException抛出 a,特定PoolEntry实现还可以检查资源是否无效,而无需close()调用。Usingclose()保证Pool在正确使用后将回收该点,并且当您能够捕获抛出的异常时,可以将其与“无效标记”一起使用。

但是Pool即使未捕获的资源Exception不允许您专门调用close(),特定实现也将能够检查资源是否无效,因为它们将能够使用不同的机制检查状态。例如,检查连接处于IDLEstate的次数。如果此时间优于 标记的某个阈值Pool,则此点将被回收,而无需close()客户端先前的调用。

这一次Pool将是调用close()它的结束,避免deadlock在客户端出现可能,如果这个不管理最大连接时间或某些异常。

于 2020-12-15T00:33:01.707 回答