4

我目前正在尝试编写一个自定义流代理(让我们以这种方式调用它),它可以更改给定输入流的内容,并在必要时生成修改后的输出。这个要求是非常必要的,因为有时我必须修改我的应用程序中的流(例如,实时压缩数据。下面的类非常简单,它使用内部缓冲。

private static class ProxyInputStream extends InputStream {

    private final InputStream iStream;
    private final byte[] iBuffer = new byte[512];

    private int iBufferedBytes;

    private final ByteArrayOutputStream oBufferStream;
    private final OutputStream oStream;

    private byte[] oBuffer = emptyPrimitiveByteArray;
    private int oBufferIndex;

    ProxyInputStream(InputStream iStream, IFunction<OutputStream, ByteArrayOutputStream> oStreamFactory) {
        this.iStream = iStream;
        oBufferStream = new ByteArrayOutputStream(512);
        oStream = oStreamFactory.evaluate(oBufferStream);
    }

    @Override
    public int read() throws IOException {
        if ( oBufferIndex == oBuffer.length ) {
            iBufferedBytes = iStream.read(iBuffer);
            if ( iBufferedBytes == -1 ) {
                return -1;
            }
            oBufferIndex = 0;
            oStream.write(iBuffer, 0, iBufferedBytes);
            oStream.flush();
            oBuffer = oBufferStream.toByteArray();
            oBufferStream.reset();
        }
        return oBuffer[oBufferIndex++];
    }

}

假设我们还有一个示例测试输出流,它只是在每个写入字节(“abc”->“ab c”)之前添加一个空格字符,如下所示:

private static class SpacingOutputStream extends OutputStream {

    private final OutputStream outputStream;

    SpacingOutputStream(OutputStream outputStream) {
        this.outputStream = outputStream;
    }

    @Override
    public void write(int b) throws IOException {
        outputStream.write(' ');
        outputStream.write(b);
    }

}

以及以下测试方法:

private static void test(final boolean useDeflater) throws IOException {
    final FileInputStream input = new FileInputStream(SOURCE);
    final IFunction<OutputStream, ByteArrayOutputStream> outputFactory = new IFunction<OutputStream, ByteArrayOutputStream>() {
        @Override
        public OutputStream evaluate(ByteArrayOutputStream outputStream) {
            return useDeflater ? new DeflaterOutputStream(outputStream) : new SpacingOutputStream(outputStream);
        }
    };
    final InputStream proxyInput = new ProxyInputStream(input, outputFactory);
    final OutputStream output = new FileOutputStream(SOURCE + ".~" + useDeflater);
    int c;
    while ( (c = proxyInput.read()) != -1 ) {
        output.write(c);
    }
    output.close();
    proxyInput.close();
}

该测试方法只是读取文件内容并将其写入另一个流,这可能可以以某种方式进行修改。如果测试方法与 一起运行useDeflater=false,则预期的方法可以正常工作。但是,如果在 set on 的情况下调用测试方法useDeflater,它的行为会非常奇怪,并且几乎什么都不写(如果省略 header 78 9C)。我怀疑 deflater 类的设计可能不符合我喜欢使用的方法,但我始终相信 ZIP 格式和 deflate 压缩旨在即时工作。

可能我在某些时候对放气压缩算法的细节有误。我真的想念什么?..也许可以有另一种方法来编写“流代理”以完全按照我想要的方式工作...如何动态压缩仅受流限制的数据?

提前致谢。


UPD:以下基本版本与 deflater 和 inflater 配合使用非常好:

public final class ProxyInputStream<OS extends OutputStream> extends InputStream {

private static final int INPUT_BUFFER_SIZE = 512;
private static final int OUTPUT_BUFFER_SIZE = 512;

private final InputStream iStream;
private final byte[] iBuffer = new byte[INPUT_BUFFER_SIZE];
private final ByteArrayOutputStream oBufferStream;
private final OS oStream;
private final IProxyInputStreamListener<OS> listener;

private byte[] oBuffer = emptyPrimitiveByteArray;
private int oBufferIndex;
private boolean endOfStream;

private ProxyInputStream(InputStream iStream, IFunction<OS, ByteArrayOutputStream> oStreamFactory, IProxyInputStreamListener<OS> listener) {
    this.iStream = iStream;
    oBufferStream = new ByteArrayOutputStream(OUTPUT_BUFFER_SIZE);
    oStream = oStreamFactory.evaluate(oBufferStream);
    this.listener = listener;
}

public static <OS extends OutputStream> ProxyInputStream<OS> proxyInputStream(InputStream iStream, IFunction<OS, ByteArrayOutputStream> oStreamFactory, IProxyInputStreamListener<OS> listener) {
    return new ProxyInputStream<OS>(iStream, oStreamFactory, listener);
}

@Override
public int read() throws IOException {
    if ( oBufferIndex == oBuffer.length ) {
        if ( endOfStream ) {
            return -1;
        } else {
            oBufferIndex = 0;
            do {
                final int iBufferedBytes = iStream.read(iBuffer);
                if ( iBufferedBytes == -1 ) {
                    if ( listener != null ) {
                        listener.afterEndOfStream(oStream);
                    }
                    endOfStream = true;
                    break;
                }
                oStream.write(iBuffer, 0, iBufferedBytes);
                oStream.flush();
            } while ( oBufferStream.size() == 0 );
            oBuffer = oBufferStream.toByteArray();
            oBufferStream.reset();
        }
    }
    return !endOfStream || oBuffer.length != 0 ? (int) oBuffer[oBufferIndex++] & 0xFF : -1;
}

}

4

3 回答 3

4

我不相信这DeflaterOutputStream.flush()有什么意义。deflater 将累积数据,直到它有一些东西要写入底层流。强制剩余数据位的唯一方法是调用DeflaterOutputStream.finish(). 但是,这不适用于您当前的实现,因为在您完全完成编写之前您不能调用完成。

实际上很难编写压缩流并在同一个线程中读取它。在RMIIO项目中,我实际上是这样做的,但是您需要一个任意大小的中间输出缓冲区(并且您基本上需要将数据推入,直到另一端出现压缩,然后您才能读取它)。您也许可以使用该项目中的一些 util 类来完成您想做的事情。

于 2012-01-17T17:04:47.203 回答
3

为什么不使用 GZipOutputStream?

我有点失落。outputStream但是当我不想压缩和想要压缩时,我应该简单地使用原始new GZipOutputStream(outputStream)文件。就这样。无论如何,请检查您是否正在刷新输出流。

Gzip 与 zip

另外:一件事是 GZIP(压缩流,这就是您正在做的事情),另一件事是编写有效的 zip 文件(文件标题、文件目录、条目(标题、数据)*)。检查ZipOutputStream

于 2012-01-17T16:58:28.283 回答
1

小心,如果你在某个地方使用了方法 int read(byte b[], int off, int len),并且出现异常情况 final int iBufferedBytes = iStream.read(iBuffer);

你会陷入无限循环

于 2017-07-25T12:12:47.763 回答