3

我正在使用java管道将数据(outstream)从解压缩模块(JavaUncompress类)传递到解析模块(处理程序类),文件很大,我想先解压缩文件并直接解析而不是保存解压缩文件然后解析。但是,它仅适用于小文件。当我输入一个 1G 文件时,似乎只有文件的一部分(比如 50000 行)从输出流到解析模块的输入流。

我尝试使用字符串来保存未压缩的文件,并且发生了同样的事情,字符串仅包含解压缩文件的一部分(与流水线版本相同的第 50000 行停止)。对发生的事情有任何想法吗?非常感谢。

这是我的管道代码:

   {
   PipedInputStream in = new PipedInputStream(); // to output
   final PipedOutputStream out = new PipedOutputStream(in); // out is something from other

   new Thread(
    new Runnable(){
        public void run(){ 
                JavaUncompress.putDataOnOutputStream(inFile,out); }
        }
        ).start();

   doc = handler.processDataFromInputStream(in);
   }

   public static void putDataOnOutputStream(String inZipFileName, PipedOutputStream out){

   try {
          FileInputStream fis = new FileInputStream(inZipFileName);
          //FilterInputStream ftis = new FilterInputStream;
          ZipInputStream zis = new ZipInputStream(new BufferedInputStream(fis));
          ZipEntry entry;

          while((entry = zis.getNextEntry()) != null) {
             System.out.println("Extracting: " +entry);
             byte data[] = new byte[BUFFER];

             long len = entry.getSize();
             long blk = len/BUFFER;
             int  rem = (int)(len - blk*BUFFER);
             System.out.println(len+" = "+blk +"*BUFFER + "+rem);

             for(long i=0; i!=blk; ++i){
                 if ((zis.read(data, 0, BUFFER)) != -1) {
                     out.write(data);
                 }
             }

             byte dataRem[] = new byte[rem];
             if ((zis.read(dataRem, 0, rem)) != -1) {
                 out.write(dataRem);
                 out.flush();
                 out.close();
             }

          }
          zis.close();

       } catch(Exception e) {
          e.printStackTrace();
       }
   }
4

2 回答 2

3

PipedOutputStream.write()如果相应PipedInputStream的得到超过 4096 或它后面的任何字节,将阻塞,但为什么要这样做呢?为什么不直接解压缩文件并在同一个线程中处理它?多线程没有优势,这只是一个毫无意义的复杂化。

在 Java 中,我在 15 年中只使用过一次管道,并且很快将其更改为队列。

于 2012-07-09T23:33:46.000 回答
2

我同意不使用 JDK 的 Pipes 实现,它们太混乱且完全同步,这是一个更快的 BlockingQueue 实现,它借助一个小缓冲区将对上下文切换的影响最小,阻塞队列是完美的对于单个生产者/消费者:

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.*;

public class QueueOutputStream extends OutputStream
{
  private static final int DEFAULT_BUFFER_SIZE=1024;
  private static final byte[] END_SIGNAL=new byte[]{};

  private final BlockingQueue<byte[]> queue=new LinkedBlockingDeque<>();
  private final byte[] buffer;

  private boolean closed=false;
  private int count=0;

  public QueueOutputStream()
  {
    this(DEFAULT_BUFFER_SIZE);
  }

  public QueueOutputStream(final int bufferSize)
  {
    if(bufferSize<=0){
      throw new IllegalArgumentException("Buffer size <= 0");
    }
    this.buffer=new byte[bufferSize];
  }

  private synchronized void flushBuffer()
  {
    if(count>0){
      final byte[] copy=new byte[count];
      System.arraycopy(buffer,0,copy,0,count);
      queue.offer(copy);
      count=0;
    }
  }

  @Override
  public synchronized void write(final int b) throws IOException
  {
    if(closed){
      throw new IllegalStateException("Stream is closed");
    }
    if(count>=buffer.length){
      flushBuffer();
    }
    buffer[count++]=(byte)b;
  }

  @Override
  public synchronized void write(final byte[] b, final int off, final int len) throws IOException
  {
    super.write(b,off,len);
  }

  @Override
  public synchronized void close() throws IOException
  {
    flushBuffer();
    queue.offer(END_SIGNAL);
    closed=true;
  }

  public Future<Void> asyncSendToOutputStream(final ExecutorService executor, final OutputStream outputStream)
  {
    return executor.submit(
            new Callable<Void>()
            {
              @Override
              public Void call() throws Exception
              {
                try{
                  byte[] buffer=queue.take();
                  while(buffer!=END_SIGNAL){
                    outputStream.write(buffer);
                    buffer=queue.take();
                  }
                  outputStream.flush();
                } catch(Exception e){
                  close();
                  throw e;
                } finally{
                  outputStream.close();
                }
                return null;
              }
            }
    );
  }
于 2013-10-19T11:27:10.930 回答