1

所以这是一个我有点碰壁的任务。在这一点上,我主要希望有更多的眼睛,因为经过多次重做和改进后,我真的看不出我的代码有任何问题。

我们需要用 Java 编写一个可以通过 gzip -d 调用正确解压缩的多线程压缩器。我们不能使用 GZIPOutputStream 调用。相反,我们手动生成了标头和尾标,并使用 Deflater 来压缩数据。我们从标准输入读取并写入标准输出。

基本上,我使用和 Executor 来维护一个线程池。我在输入时读取输入并将其写入设定大小的缓冲区。一旦缓冲区已满,我将该数据块传递给线程(将任务放入队列)。每个线程都有自己的 Deflater,并传递输入和压缩该数据所需的任何其他信息。我还将每个块的最后 32Kb 用作下一个块的字典。

我已经确认我的标题和预告片是正确的。我使用 GZIPOutputStream 压缩文件并使用 hexdump 获取字节,以便将其与输出进行比较。我检查了不同大小的文件,并且标题和尾部是相同的,所以很可能问题出在压缩数据中。我得到的错误是:无效的压缩数据--crc 错误

我已经确认,当我传入一个相对较小的输入时(因此只有一个线程,因为我从未填满缓冲区,队列中只有一个任务)输出是正确的。我可以在压缩数据上调用 gzip -d 并取回完全相同的输入。

换句话说,问题在于当有足够多的数据启动并运行多个线程时。我在大文件的输出上使用了 hexdump 并将其与 GZIPOutputStream 的 hexdump 进行比较,它们非常相似(不完全相同,但即使是小文件的情况,对于压缩数据,hexdump 也略有不同。并且在在这种情况下, gzip -d 仍然有效)。这也是我知道标题和预告片正确的方式。

传入代码转储

import java.lang.Runtime;
import java.lang.String;
import java.lang.Integer;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.nio.ByteBuffer;
import java.io.*;
import java.util.zip.*;

/*Warning: Do not compress files larger than 2GB please. Since this is just
  an assignment and not meant to replace an actual parallel compressor, I cut corners
  by casting longs to ints, since it's easier to convert to 4 bytes*/

public class Main {
    private static final int BLOCK_SIZE = 128*1024;
    private static final int DICT_SIZE = 32*1024;
    private static byte[] header = {(byte)0x1f, (byte)0x8b, (byte)0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};

    public static void main(String[] args){

    class workerThread implements Callable<byte[]> {
        private boolean lastBlock;
        private boolean dictAvailable;
        private byte[] input;
        private byte[] dictionary;
        private int lastSize;       

        private byte[] output = new byte[BLOCK_SIZE];
        private int compressedLength;
        private ByteArrayOutputStream bOut = new ByteArrayOutputStream();
        Deflater compress = new Deflater (Deflater.DEFAULT_COMPRESSION, true);

        workerThread(boolean last, byte[] blockIn, byte[] dict, boolean useDictionary, int lastBSize){
            this.lastBlock = last;
            this.input = blockIn;
            this.dictionary = dict;
            this.dictAvailable = useDictionary;
            this.lastSize = lastBSize;
        }

        public byte[] call() {
            //System.out.println("running thread ");
            if (lastBlock) {
                // System.out.println("Last block!");
                compress.setInput(input,0,lastSize);
                if(dictAvailable) {
                    compress.setDictionary(dictionary);
                }
                compress.finish();
                compressedLength = compress.deflate(output,0,BLOCK_SIZE,Deflater.SYNC_FLUSH);
            }
            else {
                //System.out.println("Not last block!");
                compress.setInput(input,0,BLOCK_SIZE);
                if(dictAvailable) {
                    compress.setDictionary(dictionary);
                }
                compressedLength = compress.deflate(output,0,BLOCK_SIZE,Deflater.SYNC_FLUSH);
            }
            byte[] finalOut = Arrays.copyOfRange(output,0,compressedLength);
            return finalOut;
        }
    }

    getProcessors p = new getProcessors();
    boolean useDict = true;
    int numProcs = p.getNumProcs();
    boolean customProcs = false;
    boolean foundProcs = false;
    boolean foundDict = false;

    /*Checking if arguments are correct*/
    ........
    /*Correct arguments, proceeding*/

    BufferedInputStream inBytes = new BufferedInputStream(System.in);
    byte[] buff = new byte[BLOCK_SIZE];
    byte[] dict = new byte[DICT_SIZE];
    int bytesRead = 0;
    int offset = 0;
    int uncompressedLength = 0;
    int lastBlockSize = 0;
    boolean isLastBlock = false;
    boolean firstBlockDone = false;

    /*Using an executor with a fixed thread pool size in order to manage threads
    as well as obtain future results to maintain synchronization*/
    ExecutorService exec = Executors.newFixedThreadPool(numProcs);
    CRC32 checksum = new CRC32();
    checksum.reset();
    List<Future<byte[]>> results = new ArrayList<Future<byte[]>>();

    //byte[] temp;

    System.out.write(header,0,header.length);
    try{
        bytesRead = inBytes.read(buff,0, BLOCK_SIZE);
        while (bytesRead != -1)  {
            uncompressedLength += bytesRead;
            checksum.update(buff,offset,bytesRead);
            offset += bytesRead;

            if (offset == BLOCK_SIZE) {
                offset = 0;
                if(!firstBlockDone){ 
                firstBlockDone = true;
                results.add(exec.submit(new workerThread(isLastBlock,buff,dict,false,lastBlockSize)));
                }
                 else {
                     results.add(exec.submit(new workerThread(isLastBlock,buff,dict,useDict,lastBlockSize)));
                 }

                 if (useDict) {
                     System.arraycopy(buff, BLOCK_SIZE-DICT_SIZE, dict, 0, DICT_SIZE);
                 }
            }

             /*Implementation warning! Because of the way bytes are read in, this program will fail if
             the file being zipped is exactly a multiple of 128*1024*/
             if((bytesRead=inBytes.read(buff,offset,BLOCK_SIZE-offset)) == -1) {
                 isLastBlock = true;
                 lastBlockSize = offset;
                 results.add(exec.submit(new workerThread(isLastBlock,buff,dict,useDict,lastBlockSize)));
             }
        }    
        try {
            for(Future<byte[]> result: results) {
            //System.out.println("Got result!");
            System.out.write(result.get(),0,result.get().length);
            //temp = result.get();
            }
        }
        catch (InterruptedException ex) {
            ex.printStackTrace();
            System.err.println("Interrupted thread!");
        } 
        catch (ExecutionException ex) {
            ex.printStackTrace();
            System.err.println("Interrupted thread!");
        }
        finally{ 
            exec.shutdownNow();
        }

    /*Converting CRC sum and total length to bytes for trailer*/
    byte[] trailer = new byte[8];
    getTrailer trail = new getTrailer(checksum.getValue(),uncompressedLength);
    trail.writeTrailer(trailer,0);
    System.out.write(trailer);

    }
    catch (IOException ioe) {
        ioe.printStackTrace();
        System.out.println("IO error.");
        System.exit(-1);    
    }
    catch (Throwable e) {
        System.out.println("Unexpected exception or error.");
        System.exit(-1);
    }
    }
}

啊,哎呀,格式被代码块格式弄丢了。

如您所见,我一直从输入中读取,直到 buff 已满。原因是因为这不是一个文件,所以第一次调用 read 可能没有读取足够的字节来填充数组(给我留下一堆我不想弄乱任何东西的空值)。一旦它满了,我把它交给 Executor,这样线程就会执行任务。我实现了 Callable 而不是 Runnable 以便我可以将输出作为字节数组返回,并且因为我需要未来的接口。exec.get() 方法允许我保持线程同步。我已经用任意情况进行了测试(打印出数字 1 - 100 以确保,它们确实按顺序打印)。

有一个缺陷是该程序无法处理 BLOCK_SIZE 的倍数的文件,但这甚至不是我现在遇到的问题。当输入足够小以至于我只运行一个线程时,这个程序就可以工作。

对于除最后一个块之外的每个块,我使用 SYNC_FLUSH 选项调用 deflate。这样我就可以在字节边界上结束。我正常压缩并调用完成的最后一个块。

对不起,很长的帖子。老实说,除了我自己的意见之外,我还需要更多的意见,因为我似乎找不到错误。万一有人想编译并运行它自己看看,这里是我拥有的其他类(只是为了获取进程数并生成预告片。这两个都很好)。

import java.io.*;

public class getTrailer {
    private long crc;
    private int total;
    public getTrailer (long crcVal, int totalIn) {
        this.crc = crcVal;
        this.total = totalIn;
    } 
    public void writeTrailer(byte[] buf, int offset) throws IOException {
        writeInt((int)crc, buf, offset); // CRC-32 of uncompr. data
        writeInt(total, buf, offset + 4); // Number of uncompr. bytes
    }

    /* Writes integer in Intel byte order to a byte array, starting at a
    * given offset
    */

    public void writeInt(int i, byte[] buf, int offset) throws IOException {
        writeShort(i & 0xffff, buf, offset);
        writeShort((i >> 16) & 0xffff, buf, offset + 2);
    }

   /*
    * Writes short integer in Intel byte order to a byte array, starting
    * at a given offset
    */

    public void writeShort(int s, byte[] buf, int offset) throws IOException {
        buf[offset] = (byte)(s & 0xff);
        buf[offset + 1] = (byte)((s >> 8) & 0xff);
    }
}

预告片功能实际上是从 JAva 文档中复制粘贴的

public class getProcessors {
    private Runtime runner = Runtime.getRuntime();
    private int nProcs = runner.availableProcessors();

    int getNumProcs() {
        return nProcs;
    }
}

我知道这有多长,但我真的需要别人的意见。如果有人看到任何他们认为可能导致问题的东西,请告诉我。我不需要有人为我编写程序(我想我快到了)但我只是......看不出有什么问题。

4

2 回答 2

0

所以,我的第一个猜测是你正在以错误的字节顺序编写 CRC。这似乎是您第一次一次写入 4 个字节。

于 2012-10-30T04:53:54.803 回答
0

如果您正在为一门课程这样做,并且您所展示的内容与所接受的内容相似,那么我希望该课程都是关于结构化的过程编程的,因为您所展示的内容以及面向对象的解决方案看起来就像是英里分开。

你的评论,

“exec.get() 方法允许我保持线程同步。我已经用任意情况进行了测试(打印出数字 1 - 100 以确保它们确实按顺序打印)。”

这与人们对多线程解决方案的期望完全相反。多线程解决方案将以完全不可预测的顺序输出数字 1-100。它按顺序出现意味着您已经同步了多线程的所有好处。在继续之前等待缓冲区填满立即让我觉得有问题。

根据责任将解决方案分成几类。您正在对动作类建模(即 getProcessors、getTrailer),这是错误的。不要根据活动或状态对类进行建模。大多数时候,简单地谈论你正在尝试做的事情将产生正确的类(即我们有一些输入数据、一个压缩器、一个解压缩器、某种工作队列、一个预告片等。如果你需要操作一个处理器列表,然后有一个处理器类包装(有一个,不是一个)列表。每个类在整个解决方案中都有特定的责任,每个类仅在其自身上运行(没有公共访问器)。当每个类能够在独立测试中执行其功能,那么您就可以在多线程解决方案中使用它们的实例。

如果您创建一个由您认为在解决方案中的类组成的域模型,然后通过向适当的类添加方法来开始对功能进行建模,那么模型本身将开始通知您应该如何对交互进行编码。提示:构造函数可以接受低级结构作为参数,其他方法不应该。

最重要的是,不要线性思考 - 你有一个 Main() 方法,它从上到下执行处理 - BUZZZZZ。不正确的反应。解决方案应该是一组类之间交互的症状,每个类都提供了整个解决方案的一个独特而独立的部分。

最好的多线程解决方案不需要同步 - 线程是谨慎的,并且能够以尽可能高的速度运行。实现这一点的一种简单方法是确保每个线程使用其自己的任何相关类的实例——不要使用共享内存。如果输出端需要同步,那么线程应该将它们的结果转储到一个类中,该类将作为输出前的最后一步执行排序。

最后,你确定你是正确的吗?我认为我更可能希望在不同的数据源上启动多个实例,而不是在一个数据源上启动多个线程。我们知道每个源都需要从头到尾进行完全处理——问题变成了我们是否希望每个源尽快处理,或者我们是否希望能够同时处理多个源,这样我们就不关心如何处理每个可能需要很长时间。

在单个线程上执行单个源的处理允许我同时处理多个源,并且在解决方案方面非常简单 - 迹象表明这是执行多线程的好地方。

对单个源执行多线程处理会增加相当大的复杂性,并且由于数据的性质(必须按输入顺序),多线程并不是一个好的解决方案。

于 2013-08-12T18:27:00.430 回答