1

当我使用 Channels.newChannel(is) 从 InputStream 创建一个通道时,java 标准库返回一个 ReadableByteChannelImpl,即:

   private static class ReadableByteChannelImpl
       extends AbstractInterruptibleChannel    // Not really interruptible
       implements ReadableByteChannel
   {
       InputStream in;
       private static final int TRANSFER_SIZE = 8192;
       private byte buf[] = new byte[0];
       private boolean open = true;
       private Object readLock = new Object();

       ReadableByteChannelImpl(InputStream in) {
           this.in = in;
       }

       public int read(ByteBuffer dst) throws IOException {
           int len = dst.remaining();
           int totalRead = 0;
           int bytesRead = 0;
           synchronized (readLock) {
               while (totalRead < len) {
                   int bytesToRead = Math.min((len - totalRead),
                                              TRANSFER_SIZE);
                   if (buf.length < bytesToRead)
                       buf = new byte[bytesToRead];
                   if ((totalRead > 0) && !(in.available() > 0))
                       break; // block at most once
                   try {
                       begin();
                       bytesRead = in.read(buf, 0, bytesToRead);
                   } finally {
                       end(bytesRead > 0);
                   }
                   if (bytesRead < 0)
                       break;
                   else
                       totalRead += bytesRead;
                   dst.put(buf, 0, bytesRead);
               }
               if ((bytesRead < 0) && (totalRead == 0))
                   return -1;

               return totalRead;
           }
       }

       protected void implCloseChannel() throws IOException {
           in.close();
           open = false;
       }
   }

如您所见,它在第一次调用 read(ByteBuffer dst) 时会阻塞,并且永远不会再次阻塞。看:

           if ((totalRead > 0) && !(in.available() > 0))
               break; // block at most once

这种奇怪行为背后的原因是什么?

此外,扩展 AbstractInterruptibleChannel 而不实际使该通道真正可中断的动机是什么?

4

1 回答 1

1

如果它已经读取了至少一个字节并且底层流宣布没有可用的字节,它将不会阻塞。请注意,InputStream#available()即使某些字节可用,它也可以返回零,但它不应该承诺比没有阻塞的情况下可以读取更多的字节。因此,这ReadableByteChannel会努力读取至少一个字节——假设提供的ByteBuffer有至少一个字节的空间——并且这样做之后,除非流承诺有更多字节,否则不会尝试再次读取底层流无阻塞可用。

至于为什么要ReadableByteChannelImpl扩展AbstractInterruptibleChannel,我怀疑是为了确保包装InputStream在调用时正确关闭Channel#close(),其合同由InterruptibleChannel#close(). 扩展AbstractInterruptibleChannel允许ReadableByteChannelImpl借用其线程安全的开闭状态保护。

正如您所说,这有点虚假广告,不是真正可中断的,但它可以容忍从单独的线程关闭,并使这样做是幂等的。

于 2009-12-12T22:02:30.673 回答