8

我编写了包装其他流的后台InputStream(和OutputStream)实现,并在后台线程上预读,主要允许在处理解压缩流的不同线程中进行解压缩/压缩。

这是一个相当标准的生产者/消费者模型。

这似乎是一种通过读取、处理和写入数据的简单进程充分利用多核 CPU 的简单方法,从而更有效地利用 CPU 和磁盘资源。也许“高效”不是最好的词,但与直接从 a 读取ZipInputStream和直接写入a 相比,它提供了更高的利用率,而且我更感兴趣的是,它减少了运行时间ZipOutputStream

我很高兴发布代码,但我的问题是我是否正在重新发明现有(并且更频繁地使用)库中现成的东西?

编辑 - 发布代码...

我的代码BackgroundInputStream如下(BackgroundOutputStream非常相似),但有些方面我想改进。

  1. 看起来我工作太努力了,无法来回传递缓冲区。
  2. 如果调用代码丢弃对 的引用BackgroundInputStream,则backgroundReaderThread将永远存在。
  3. 信号eof需要改进。
  4. 异常应该传播到前台线程。
  5. 我想允许使用提供的线程Executor
  6. close()方法应该向后台线程发出信号,并且不应该关闭包装的流,因为包装的流应该由从中读取的后台线程拥有。
  7. 关门后做一些傻事,比如看书,应该得到适当的照顾。

package nz.co.datacute.io;

import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;

public class BackgroundInputStream extends InputStream {
    private static final int DEFAULT_QUEUE_SIZE = 1;
    private static final int DEFAULT_BUFFER_SIZE = 64*1024;
    private final int queueSize;
    private final int bufferSize;
    private volatile boolean eof = false;
    private LinkedBlockingQueue<byte[]> bufferQueue;
    private final InputStream wrappedInputStream;
    private byte[] currentBuffer;
    private volatile byte[] freeBuffer;
    private int pos;

    public BackgroundInputStream(InputStream wrappedInputStream) {
        this(wrappedInputStream, DEFAULT_QUEUE_SIZE, DEFAULT_BUFFER_SIZE);
    }

    public BackgroundInputStream(InputStream wrappedInputStream,int queueSize,int bufferSize) {
        this.wrappedInputStream = wrappedInputStream;
        this.queueSize = queueSize;
        this.bufferSize = bufferSize;
    }

    @Override
    public int read() throws IOException {
        if (bufferQueue == null) {
            bufferQueue = new LinkedBlockingQueue<byte[]>(queueSize);
            BackgroundReader backgroundReader = new BackgroundReader();
            Thread backgroundReaderThread = new Thread(backgroundReader, "Background InputStream");
            backgroundReaderThread.start();
        }
        if (currentBuffer == null) {
            try {
                if ((!eof) || (bufferQueue.size() > 0)) {
                    currentBuffer = bufferQueue.take();
                    pos = 0;
                } else {
                    return -1;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        int b = currentBuffer[pos++];
        if (pos == currentBuffer.length) {
            freeBuffer = currentBuffer;
            currentBuffer = null;
        }
        return b;
    }

    @Override
    public int available() throws IOException {
        if (currentBuffer == null) return 0;
        return currentBuffer.length;
    }

    @Override
    public void close() throws IOException {
        wrappedInputStream.close();
        currentBuffer = null;
        freeBuffer = null;
    }

    class BackgroundReader implements Runnable {

        @Override
        public void run() {
            try {
                while (!eof) {
                    byte[] newBuffer;
                    if (freeBuffer != null) {
                        newBuffer = freeBuffer;
                        freeBuffer = null;
                    } else {
                        newBuffer = new byte[bufferSize];
                    }
                    int bytesRead = 0;
                    int writtenToBuffer = 0;
                    while (((bytesRead = wrappedInputStream.read(newBuffer, writtenToBuffer, bufferSize - writtenToBuffer)) != -1) && (writtenToBuffer < bufferSize)) {
                        writtenToBuffer += bytesRead;
                    }
                    if (writtenToBuffer > 0) {
                        if (writtenToBuffer < bufferSize) {
                            newBuffer = Arrays.copyOf(newBuffer, writtenToBuffer);
                        }
                        bufferQueue.put(newBuffer);
                    }
                    if (bytesRead == -1) {
                        eof = true;
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}
4

2 回答 2

3

听起来不错。我从来没有遇到过开箱即用的任何东西,但如果可用的话,尝试使用空闲核心进行压缩是非常有意义的。

也许您可以使用Commons I/O - 这是一个经过良好测试的库,可以帮助处理一些更无聊的东西,让您专注于扩展酷炫的并行部分。也许您甚至可以将您的代码贡献给 Commons 项目;-)

于 2010-01-28T16:50:10.013 回答
0

我会感兴趣的。我已经考虑过一个类似的项目,但无法弄清楚如何处理完成压缩的片段。

于 2010-02-02T04:13:16.933 回答