8

问题

请参阅下面编辑部分中的更新问题

我正在尝试使用 GZIPInputStream 即时从 Amazon S3 解压缩大型(~300M)GZIPed 文件,但它只输出文件的一部分;但是,如果我在解压缩之前下载到文件系统,则 GZIPInputStream 将解压缩整个文件。

如何让 GZIPInputStream 解压缩整个 HTTPInputStream 而不仅仅是它的第一部分?

我试过的

请参阅下面编辑部分中的更新

我怀疑一个 HTTP 问题,除了没有抛出异常,GZIPInputStream 每次都返回一个相当一致的文件块,据我所知,它总是在 WET 记录边界上中断,尽管它选择的边界是不同的URL(这很奇怪,因为所有内容都被视为二进制流,根本没有解析文件中的 WET 记录。)

我能找到的最接近的问题是 从 s3 读取时 GZIPInputStream 过早关闭该问题的答案是,某些 GZIP 文件实际上是多个附加的 GZIP 文件,而 GZIPInputStream 不能很好地处理。但是,如果是这种情况,为什么 GZIPInputStream 可以在文件的本地副本上正常工作?

演示代码和输出

下面是一段示例代码,演示了我看到的问题。我已经在两个不同网络上的两台不同 Linux 计算机上使用 Java 1.8.0_72 和 1.8.0_112 对其进行了测试,结果相似。我希望解压后的 HTTPInputStream 的字节数与解压后的本地文件副本的字节数相同,但解压后的 HTTPInputStream 小得多。

输出
Testing URL https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698540409.8/wet/CC-MAIN-20161202170900-00009-ip-10-31-129-80.ec2.internal.warc.wet.gz
Testing HTTP Input Stream direct to GZIPInputStream
Testing saving to file before decompression
Read 87894 bytes from HTTP->GZIP
Read 448974935 bytes from HTTP->file->GZIP
Output from HTTP->GZIP saved to file testfile0.wet
------
Testing URL https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698540409.8/wet/CC-MAIN-20161202170900-00040-ip-10-31-129-80.ec2.internal.warc.wet.gz
Testing HTTP Input Stream direct to GZIPInputStream
Testing saving to file before decompression
Read 1772936 bytes from HTTP->GZIP
Read 451171329 bytes from HTTP->file->GZIP
Output from HTTP->GZIP saved to file testfile40.wet
------
Testing URL https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698541142.66/wet/CC-MAIN-20161202170901-00500-ip-10-31-129-80.ec2.internal.warc.wet.gz
Testing HTTP Input Stream direct to GZIPInputStream
Testing saving to file before decompression
Read 89217 bytes from HTTP->GZIP
Read 453183600 bytes from HTTP->file->GZIP
Output from HTTP->GZIP saved to file testfile500.wet
示例代码
import java.net.*;
import java.io.*;
import java.util.zip.GZIPInputStream;
import java.nio.channels.*;

public class GZIPTest {
    public static void main(String[] args) throws Exception {
        // Our three test files from CommonCrawl
        URL url0 = new URL("https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698540409.8/wet/CC-MAIN-20161202170900-00009-ip-10-31-129-80.ec2.internal.warc.wet.gz");
        URL url40 = new URL("https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698540409.8/wet/CC-MAIN-20161202170900-00040-ip-10-31-129-80.ec2.internal.warc.wet.gz");
        URL url500 = new URL("https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698541142.66/wet/CC-MAIN-20161202170901-00500-ip-10-31-129-80.ec2.internal.warc.wet.gz");

        /*
         * Test the URLs and display the results
         */
        test(url0, "testfile0.wet");
        System.out.println("------");
        test(url40, "testfile40.wet");
        System.out.println("------");
        test(url500, "testfile500.wet");
    }

    public static void test(URL url, String testGZFileName) throws Exception {
        System.out.println("Testing URL "+url.toString());

        // First directly wrap the HTTPInputStream with GZIPInputStream
        // and count the number of bytes we read
        // Go ahead and save the extracted stream to a file for further inspection
        System.out.println("Testing HTTP Input Stream direct to GZIPInputStream");
        int bytesFromGZIPDirect = 0;
        URLConnection urlConnection = url.openConnection();
        FileOutputStream directGZIPOutStream = new FileOutputStream("./"+testGZFileName);

        // FIRST TEST - Decompress from HTTPInputStream
        GZIPInputStream gzipishttp = new GZIPInputStream(urlConnection.getInputStream());

        byte[] buffer = new byte[1024];
        int bytesRead = -1;
        while ((bytesRead = gzipishttp.read(buffer, 0, 1024)) != -1) {
            bytesFromGZIPDirect += bytesRead;
            directGZIPOutStream.write(buffer, 0, bytesRead); // save to file for further inspection
        }
        gzipishttp.close();
        directGZIPOutStream.close();

        // Now save the GZIPed file locally
        System.out.println("Testing saving to file before decompression");
        int bytesFromGZIPFile = 0;
        ReadableByteChannel rbc = Channels.newChannel(url.openStream());
        FileOutputStream outputStream = new FileOutputStream("./test.wet.gz");
        outputStream.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
        outputStream.close();

        // SECOND TEST - decompress from FileInputStream
        GZIPInputStream gzipis = new GZIPInputStream(new FileInputStream("./test.wet.gz"));

        buffer = new byte[1024];
        bytesRead = -1;
        while((bytesRead = gzipis.read(buffer, 0, 1024)) != -1) {
            bytesFromGZIPFile += bytesRead;
        }
        gzipis.close();

        // The Results - these numbers should match but they don't
        System.out.println("Read "+bytesFromGZIPDirect+" bytes from HTTP->GZIP");
        System.out.println("Read "+bytesFromGZIPFile+" bytes from HTTP->file->GZIP");
        System.out.println("Output from HTTP->GZIP saved to file "+testGZFileName);
    }

}

编辑

根据@VGR 的评论,演示代码中的已关闭流和相关通道。

更新

问题似乎确实是文件特有的。我在本地提取了 Common Crawl WET 存档(wget),解压缩它(gunzip 1.8),然后重新压缩它(gzip 1.8)并重新上传到 S3,然后即时解压缩工作正常。如果您修改上面的示例代码以包含以下行,您可以看到测试:

// Original file from CommonCrawl hosted on S3
URL originals3 = new URL("https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698540409.8/wet/CC-MAIN-20161202170900-00009-ip-10-31-129-80.ec2.internal.warc.wet.gz");
// Recompressed file hosted on S3
URL rezippeds3 = new URL("https://s3-us-west-1.amazonaws.com/com.jeffharwell.commoncrawl.gziptestbucket/CC-MAIN-20161202170900-00009-ip-10-31-129-80.ec2.internal.warc.wet.gz");

test(originals3, "originalhost.txt");
test(rezippeds3, "rezippedhost.txt");

URL rezippeds3 指向我下载、解压缩和重新压缩,然后重新上传到 S3 的 WET 存档文件。您将看到以下输出:

Testing URL https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698540409.8/wet/CC-MAIN-20161202170900-00009-ip-10-31-129-80.ec2.internal.warc.wet.gz
Testing HTTP Input Stream direct to GZIPInputStream
Testing saving to file before decompression
Read 7212400 bytes from HTTP->GZIP
Read 448974935 bytes from HTTP->file->GZIP
Output from HTTP->GZIP saved to file originals3.txt
-----
Testing URL https://s3-us-west-1.amazonaws.com/com.jeffharwell.commoncrawl.gziptestbucket/CC-MAIN-20161202170900-00009-ip-10-31-129-80.ec2.internal.warc.wet.gz
Testing HTTP Input Stream direct to GZIPInputStream
Testing saving to file before decompression
Read 448974935 bytes from HTTP->GZIP
Read 448974935 bytes from HTTP->file->GZIP
Output from HTTP->GZIP saved to file rezippeds3.txt

正如您所看到的,一旦文件被重新压缩,我就能够通过 GZIPInputStream 流式传输它并获取整个文件。原始文件仍然显示解压缩的通常提前结束。当我下载并上传 WET 文件而不重新压缩它时,我得到了同样不完整的流媒体行为,所以它肯定是修复它的重新压缩。我还将原始文件和重新压缩后的两个文件都放到了传统的 Apache Web 服务器上,并且能够复制结果,因此 S3 似乎与该问题无关。

所以。我有一个新问题。

新问题

为什么读取相同内容时 FileInputStream 的行为与 HTTPInputStream 不同。如果它是完全相同的文件,为什么:

新 GZIPInputStream(urlConnection.getInputStream());

行为不同于

新 GZIPInputStream(new FileInputStream("./test.wet.gz"));

?? 输入流不只是输入流吗?

4

1 回答 1

12

根本原因讨论

事实证明,InputStreams 可以有很大的不同。特别是它们在实现 .available() 方法的方式上有所不同。例如 ByteArrayInputStream .available() 返回 InputStream 中剩余的字节数。但是,HTTPInputStream .available() 在需要发出阻塞 IO 请求以重新填充缓冲区之前返回可供读取的字节数。(有关更多信息,请参阅 Java 文档)

问题是 GZIPInputStream 使用 .available() 的输出来确定在完成解压缩完整的 GZIP 文件后 InputStream 中是否还有可用的其他 GZIP 文件。这是 OpenJDK 源文件GZIPInputStream.java方法 readTrailer() 的第 231 行。

   if (this.in.available() > 0 || n > 26) {

如果 HTTPInputStream 读取缓冲区在两个连接的 GZIP 文件的边界处清空,GZIPInputStream 调用 .available(),它以 0 响应,因为它需要到网络重新填充缓冲区,因此 GZIPInputStream 将文件视为完整并提前关闭。

Common Crawl .wet 档案是数百兆字节的小型级联 GZIP 文件,因此最终 HTTPInputStream 缓冲区将在其中一个级联 GZIP 文件的末尾清空,并且 GZIPInputStream 将过早关闭。这解释了问题中展示的问题。

解决方案和解决方法

GIST包含 jdk8u152-b00 修订版 12039 的补丁和两个 jtreg 测试,这些测试消除了(以我的拙见)对 .available() 的不正确依赖。

如果您无法修补 JDK,解决方法是确保 available() 始终返回 > 0,这会强制 GZIPInputStream 始终检查流中的另一个 GZIP 文件。不幸的是,HTTPInputStream 是私有的,因此您不能直接对其进行子类化,而是扩展 InputStream 并包装 HTTPInputStream。下面的代码演示了这个解决方法。

演示代码和输出

这里的输出显示当 HTTPInputStream 被包装时,GZIPInputStream 将在从文件和直接从 HTTP 读取连接的 GZIP 时产生相同的结果。

Testing URL https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698540409.8/wet/CC-MAIN-20161202170900-00009-ip-10-31-129-80.ec2.internal.warc.wet.gz
Testing HTTP Input Stream direct to GZIPInputStream
Testing saving to file before decompression
Read 448974935 bytes from HTTP->GZIP
Read 448974935 bytes from HTTP->file->GZIP
Output from HTTP->GZIP saved to file testfile0.wet
------
Testing URL https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698540409.8/wet/CC-MAIN-20161202170900-00040-ip-10-31-129-80.ec2.internal.warc.wet.gz
Testing HTTP Input Stream direct to GZIPInputStream
Testing saving to file before decompression
Read 451171329 bytes from HTTP->GZIP
Read 451171329 bytes from HTTP->file->GZIP
Output from HTTP->GZIP saved to file testfile40.wet
------
Testing URL https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698541142.66/wet/CC-MAIN-20161202170901-00500-ip-10-31-129-80.ec2.internal.warc.wet.gz
Testing HTTP Input Stream direct to GZIPInputStream
Testing saving to file before decompression
Read 453183600 bytes from HTTP->GZIP
Read 453183600 bytes from HTTP->file->GZIP
Output from HTTP->GZIP saved to file testfile500.wet

这是使用 InputStream 包装器修改的问题中的演示代码。

import java.net.*;
import java.io.*;
import java.util.zip.GZIPInputStream;
import java.nio.channels.*;

public class GZIPTest {
    // Here is a wrapper class that wraps an InputStream
    // but always returns > 0 when .available() is called.
    // This will cause GZIPInputStream to always make another 
    // call to the InputStream to check for an additional 
    // concatenated GZIP file in the stream.
    public static class AvailableInputStream extends InputStream {
        private InputStream is;

        AvailableInputStream(InputStream inputstream) {
            is = inputstream;
        }

        public int read() throws IOException {
            return(is.read());
        }

        public int read(byte[] b) throws IOException {
            return(is.read(b));
        }

        public int read(byte[] b, int off, int len) throws IOException {
            return(is.read(b, off, len));
        }

        public void close() throws IOException {
            is.close();
        }

        public int available() throws IOException {
            // Always say that we have 1 more byte in the
            // buffer, even when we don't
            int a = is.available();
            if (a == 0) {
                return(1);
            } else {
                return(a);
            }
        }
    }



    public static void main(String[] args) throws Exception {
        // Our three test files from CommonCrawl
        URL url0 = new URL("https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698540409.8/wet/CC-MAIN-20161202170900-00009-ip-10-31-129-80.ec2.internal.warc.wet.gz");
        URL url40 = new URL("https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698540409.8/wet/CC-MAIN-20161202170900-00040-ip-10-31-129-80.ec2.internal.warc.wet.gz");
        URL url500 = new URL("https://commoncrawl.s3.amazonaws.com/crawl-data/CC-MAIN-2016-50/segments/1480698541142.66/wet/CC-MAIN-20161202170901-00500-ip-10-31-129-80.ec2.internal.warc.wet.gz");

        /*
         * Test the URLs and display the results
         */
        test(url0, "testfile0.wet");
        System.out.println("------");
        test(url40, "testfile40.wet");
        System.out.println("------");
        test(url500, "testfile500.wet");
    }

    public static void test(URL url, String testGZFileName) throws Exception {
        System.out.println("Testing URL "+url.toString());

        // First directly wrap the HTTP inputStream with GZIPInputStream
        // and count the number of bytes we read
        // Go ahead and save the extracted stream to a file for further inspection
        System.out.println("Testing HTTP Input Stream direct to GZIPInputStream");
        int bytesFromGZIPDirect = 0;
        URLConnection urlConnection = url.openConnection();
        // Wrap the HTTPInputStream in our AvailableHttpInputStream
        AvailableInputStream ais = new AvailableInputStream(urlConnection.getInputStream());
        GZIPInputStream gzipishttp = new GZIPInputStream(ais);
        FileOutputStream directGZIPOutStream = new FileOutputStream("./"+testGZFileName);
        int buffersize = 1024;
        byte[] buffer = new byte[buffersize];
        int bytesRead = -1;
        while ((bytesRead = gzipishttp.read(buffer, 0, buffersize)) != -1) {
            bytesFromGZIPDirect += bytesRead;
            directGZIPOutStream.write(buffer, 0, bytesRead); // save to file for further inspection
        }
        gzipishttp.close();
        directGZIPOutStream.close();

        // Save the GZIPed file locally
        System.out.println("Testing saving to file before decompression");
        ReadableByteChannel rbc = Channels.newChannel(url.openStream());
        FileOutputStream outputStream = new FileOutputStream("./test.wet.gz");
        outputStream.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);

        // Now decompress the local file and count the number of bytes
        int bytesFromGZIPFile = 0;
        GZIPInputStream gzipis = new GZIPInputStream(new FileInputStream("./test.wet.gz"));

        buffer = new byte[1024];
        while((bytesRead = gzipis.read(buffer, 0, 1024)) != -1) {
            bytesFromGZIPFile += bytesRead;
        }
        gzipis.close();

        // The Results
        System.out.println("Read "+bytesFromGZIPDirect+" bytes from HTTP->GZIP");
        System.out.println("Read "+bytesFromGZIPFile+" bytes from HTTP->file->GZIP");
        System.out.println("Output from HTTP->GZIP saved to file "+testGZFileName);
    }

}
于 2017-01-05T02:12:07.820 回答