0

这是一个演示我的问题的 JUnit 测试:

package stream;

import static org.junit.jupiter.api.Assertions.*;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import org.junit.jupiter.api.Test;

class StreamTest {
    public static class LoopbackStream {
        private final byte[] END_MARKER = new byte[0];
        private final ArrayBlockingQueue<byte[]> queue = new ArrayBlockingQueue<>(1024);

        public OutputStream getOutputStream() {
            return new OutputStream() {
                @Override
                public void write(int b) throws IOException {
                    this.write(new byte[] { (byte) b });
                }

                @Override
                public void write(byte[] b, int off, int len) {
                    try {
                        queue.put(Arrays.copyOfRange(b, off, len - off));
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }

                @Override
                public void close() {
                    try {
                        queue.put(END_MARKER);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            };
        }

        public InputStream getInputStream() {
            return new InputStream() {
                private boolean finished = false;
                private ByteBuffer current = ByteBuffer.wrap(new byte[0]);

                @Override
                public int read() {
                    if (ensureData()) {
                        return Byte.toUnsignedInt(current.get());
                    } else {
                        return -1;
                    }
                }

                @Override
                public int read(byte[] b, int off, int len) {
                    if (ensureData()) {
                        int position = current.position();
                        current.get(b, off, Math.min(len, current.remaining()));
                        return current.position() - position;
                    } else {
                        return -1;
                    }
                }

                private boolean ensureData() {
                    if (!finished && !current.hasRemaining()) {
                        try {
                            byte[] data = queue.take();
                            current = ByteBuffer.wrap(data);
                            finished = data == END_MARKER;
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            return false;
                        }
                    }
                    return !finished;
                }
            };
        }

    }

    @Test
    void testVanilla() throws IOException {
        LoopbackStream objectUnderTest = new LoopbackStream();

        PrintWriter pw = new PrintWriter(new OutputStreamWriter(objectUnderTest.getOutputStream()), true);
        BufferedReader br = new BufferedReader(new InputStreamReader(objectUnderTest.getInputStream()));

        pw.println("Hello World!");

        assertEquals("Hello World!", br.readLine());
    }

    @Test
    void testVanilla2() throws IOException {
        LoopbackStream objectUnderTest = new LoopbackStream();

        PrintWriter pw = new PrintWriter(new OutputStreamWriter(objectUnderTest.getOutputStream()), true);
        BufferedReader br = new BufferedReader(new InputStreamReader(objectUnderTest.getInputStream()));

        pw.println("Hello World!");
        assertEquals("Hello World!", br.readLine());

        pw.println("Hello Otherworld!");
        assertEquals("Hello Otherworld!", br.readLine());
    }

    @Test
    void testGzipped() throws IOException {
        LoopbackStream objectUnderTest = new LoopbackStream();

        PrintWriter pw = new PrintWriter(new OutputStreamWriter(new GZIPOutputStream(objectUnderTest.getOutputStream(), true)), true);
        BufferedReader br = new BufferedReader(new InputStreamReader(new GZIPInputStream(objectUnderTest.getInputStream())));

        pw.println("Hello World!");

        assertEquals("Hello World!", br.readLine());
    }
}

有两个单独的测试。一个使用普通输入和输出流(工作正常),另一个将这些流包装在它们的 gzip 等效项中。

我使用了GZIPOutputStream'ssyncFlush选项,我希望在刷新父流时自动刷新流中的所有剩余字节。每当它执行PrintWriter.autoFlushprintln

有没有更好的方法来强制GZIPOutputStream在 a 之后刷新其缓冲区println

4

1 回答 1

1

我知道这不是您问题的完整答案,但是评论太长了...


更新:

经过进一步调查,似乎不是GZIPOutputStream不刷新(通过System.out.println("xy");在方法中添加语句,public void write(byte[] b, int off, int len)您可以看到GZIPOutputStream将两个字节数组写入您的OutputStream: 一个是 gzip 流标头,另一个是第一个的编码内容文本行)。

似乎由于java.io.InputStreamReader(分别使用sun.nio.cs.StreamDecoder它)和GZIPInputStream.

基本上,如果StreamDecoder需要从底层流中读取字节,它会尝试读取尽可能多的字节(只要底层流报告in.available() > 0,这意味着底层流可以在不阻塞的情况下产生更多字节)

这个检查的问题是InflaterInputStream(的超类GZIPInputStream)总是返回1可用字节的数量,即使它的源流没有可用的字节(见源InflaterInputStream.available()

所以看起来虽然你可以逐行写入 a GZIPOutputStream,但你不能轻易地从 a 中逐行读取GZIPInputStream...


原答案:

问题不在于GZIPOutputStream,问题boolean ensureData()在于拒绝读取多个块的方法。

以下测试也因香草流而失败:

@Test
void testVanilla2() throws IOException {
    LoopbackStream objectUnderTest = new LoopbackStream();

    PrintWriter pw = new PrintWriter(new OutputStreamWriter(objectUnderTest.getOutputStream()), true);
    BufferedReader br = new BufferedReader(new InputStreamReader(objectUnderTest.getInputStream()));

    pw.println("Hello World!");
    assertEquals("Hello World!", br.readLine());

    pw.println("Hello Otherworld!");
    assertEquals("Hello Otherworld!", br.readLine());
}

于 2021-03-23T22:25:07.487 回答