所以这是一个我有点碰壁的任务。在这一点上,我主要希望有更多的眼睛,因为经过多次重做和改进后,我真的看不出我的代码有任何问题。
我们需要用 Java 编写一个可以通过 gzip -d 调用正确解压缩的多线程压缩器。我们不能使用 GZIPOutputStream 调用。相反,我们手动生成了标头和尾标,并使用 Deflater 来压缩数据。我们从标准输入读取并写入标准输出。
基本上,我使用和 Executor 来维护一个线程池。我在输入时读取输入并将其写入设定大小的缓冲区。一旦缓冲区已满,我将该数据块传递给线程(将任务放入队列)。每个线程都有自己的 Deflater,并传递输入和压缩该数据所需的任何其他信息。我还将每个块的最后 32Kb 用作下一个块的字典。
我已经确认我的标题和预告片是正确的。我使用 GZIPOutputStream 压缩文件并使用 hexdump 获取字节,以便将其与输出进行比较。我检查了不同大小的文件,并且标题和尾部是相同的,所以很可能问题出在压缩数据中。我得到的错误是:无效的压缩数据--crc 错误
我已经确认,当我传入一个相对较小的输入时(因此只有一个线程,因为我从未填满缓冲区,队列中只有一个任务)输出是正确的。我可以在压缩数据上调用 gzip -d 并取回完全相同的输入。
换句话说,问题在于当有足够多的数据启动并运行多个线程时。我在大文件的输出上使用了 hexdump 并将其与 GZIPOutputStream 的 hexdump 进行比较,它们非常相似(不完全相同,但即使是小文件的情况,对于压缩数据,hexdump 也略有不同。并且在在这种情况下, gzip -d 仍然有效)。这也是我知道标题和预告片正确的方式。
传入代码转储
import java.lang.Runtime;
import java.lang.String;
import java.lang.Integer;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.nio.ByteBuffer;
import java.io.*;
import java.util.zip.*;
/*Warning: Do not compress files larger than 2GB please. Since this is just
an assignment and not meant to replace an actual parallel compressor, I cut corners
by casting longs to ints, since it's easier to convert to 4 bytes*/
public class Main {
private static final int BLOCK_SIZE = 128*1024;
private static final int DICT_SIZE = 32*1024;
private static byte[] header = {(byte)0x1f, (byte)0x8b, (byte)0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
public static void main(String[] args){
class workerThread implements Callable<byte[]> {
private boolean lastBlock;
private boolean dictAvailable;
private byte[] input;
private byte[] dictionary;
private int lastSize;
private byte[] output = new byte[BLOCK_SIZE];
private int compressedLength;
private ByteArrayOutputStream bOut = new ByteArrayOutputStream();
Deflater compress = new Deflater (Deflater.DEFAULT_COMPRESSION, true);
workerThread(boolean last, byte[] blockIn, byte[] dict, boolean useDictionary, int lastBSize){
this.lastBlock = last;
this.input = blockIn;
this.dictionary = dict;
this.dictAvailable = useDictionary;
this.lastSize = lastBSize;
}
public byte[] call() {
//System.out.println("running thread ");
if (lastBlock) {
// System.out.println("Last block!");
compress.setInput(input,0,lastSize);
if(dictAvailable) {
compress.setDictionary(dictionary);
}
compress.finish();
compressedLength = compress.deflate(output,0,BLOCK_SIZE,Deflater.SYNC_FLUSH);
}
else {
//System.out.println("Not last block!");
compress.setInput(input,0,BLOCK_SIZE);
if(dictAvailable) {
compress.setDictionary(dictionary);
}
compressedLength = compress.deflate(output,0,BLOCK_SIZE,Deflater.SYNC_FLUSH);
}
byte[] finalOut = Arrays.copyOfRange(output,0,compressedLength);
return finalOut;
}
}
getProcessors p = new getProcessors();
boolean useDict = true;
int numProcs = p.getNumProcs();
boolean customProcs = false;
boolean foundProcs = false;
boolean foundDict = false;
/*Checking if arguments are correct*/
........
/*Correct arguments, proceeding*/
BufferedInputStream inBytes = new BufferedInputStream(System.in);
byte[] buff = new byte[BLOCK_SIZE];
byte[] dict = new byte[DICT_SIZE];
int bytesRead = 0;
int offset = 0;
int uncompressedLength = 0;
int lastBlockSize = 0;
boolean isLastBlock = false;
boolean firstBlockDone = false;
/*Using an executor with a fixed thread pool size in order to manage threads
as well as obtain future results to maintain synchronization*/
ExecutorService exec = Executors.newFixedThreadPool(numProcs);
CRC32 checksum = new CRC32();
checksum.reset();
List<Future<byte[]>> results = new ArrayList<Future<byte[]>>();
//byte[] temp;
System.out.write(header,0,header.length);
try{
bytesRead = inBytes.read(buff,0, BLOCK_SIZE);
while (bytesRead != -1) {
uncompressedLength += bytesRead;
checksum.update(buff,offset,bytesRead);
offset += bytesRead;
if (offset == BLOCK_SIZE) {
offset = 0;
if(!firstBlockDone){
firstBlockDone = true;
results.add(exec.submit(new workerThread(isLastBlock,buff,dict,false,lastBlockSize)));
}
else {
results.add(exec.submit(new workerThread(isLastBlock,buff,dict,useDict,lastBlockSize)));
}
if (useDict) {
System.arraycopy(buff, BLOCK_SIZE-DICT_SIZE, dict, 0, DICT_SIZE);
}
}
/*Implementation warning! Because of the way bytes are read in, this program will fail if
the file being zipped is exactly a multiple of 128*1024*/
if((bytesRead=inBytes.read(buff,offset,BLOCK_SIZE-offset)) == -1) {
isLastBlock = true;
lastBlockSize = offset;
results.add(exec.submit(new workerThread(isLastBlock,buff,dict,useDict,lastBlockSize)));
}
}
try {
for(Future<byte[]> result: results) {
//System.out.println("Got result!");
System.out.write(result.get(),0,result.get().length);
//temp = result.get();
}
}
catch (InterruptedException ex) {
ex.printStackTrace();
System.err.println("Interrupted thread!");
}
catch (ExecutionException ex) {
ex.printStackTrace();
System.err.println("Interrupted thread!");
}
finally{
exec.shutdownNow();
}
/*Converting CRC sum and total length to bytes for trailer*/
byte[] trailer = new byte[8];
getTrailer trail = new getTrailer(checksum.getValue(),uncompressedLength);
trail.writeTrailer(trailer,0);
System.out.write(trailer);
}
catch (IOException ioe) {
ioe.printStackTrace();
System.out.println("IO error.");
System.exit(-1);
}
catch (Throwable e) {
System.out.println("Unexpected exception or error.");
System.exit(-1);
}
}
}
啊,哎呀,格式被代码块格式弄丢了。
如您所见,我一直从输入中读取,直到 buff 已满。原因是因为这不是一个文件,所以第一次调用 read 可能没有读取足够的字节来填充数组(给我留下一堆我不想弄乱任何东西的空值)。一旦它满了,我把它交给 Executor,这样线程就会执行任务。我实现了 Callable 而不是 Runnable 以便我可以将输出作为字节数组返回,并且因为我需要未来的接口。exec.get() 方法允许我保持线程同步。我已经用任意情况进行了测试(打印出数字 1 - 100 以确保,它们确实按顺序打印)。
有一个缺陷是该程序无法处理 BLOCK_SIZE 的倍数的文件,但这甚至不是我现在遇到的问题。当输入足够小以至于我只运行一个线程时,这个程序就可以工作。
对于除最后一个块之外的每个块,我使用 SYNC_FLUSH 选项调用 deflate。这样我就可以在字节边界上结束。我正常压缩并调用完成的最后一个块。
对不起,很长的帖子。老实说,除了我自己的意见之外,我还需要更多的意见,因为我似乎找不到错误。万一有人想编译并运行它自己看看,这里是我拥有的其他类(只是为了获取进程数并生成预告片。这两个都很好)。
import java.io.*;
public class getTrailer {
private long crc;
private int total;
public getTrailer (long crcVal, int totalIn) {
this.crc = crcVal;
this.total = totalIn;
}
public void writeTrailer(byte[] buf, int offset) throws IOException {
writeInt((int)crc, buf, offset); // CRC-32 of uncompr. data
writeInt(total, buf, offset + 4); // Number of uncompr. bytes
}
/* Writes integer in Intel byte order to a byte array, starting at a
* given offset
*/
public void writeInt(int i, byte[] buf, int offset) throws IOException {
writeShort(i & 0xffff, buf, offset);
writeShort((i >> 16) & 0xffff, buf, offset + 2);
}
/*
* Writes short integer in Intel byte order to a byte array, starting
* at a given offset
*/
public void writeShort(int s, byte[] buf, int offset) throws IOException {
buf[offset] = (byte)(s & 0xff);
buf[offset + 1] = (byte)((s >> 8) & 0xff);
}
}
预告片功能实际上是从 JAva 文档中复制粘贴的
public class getProcessors {
private Runtime runner = Runtime.getRuntime();
private int nProcs = runner.availableProcessors();
int getNumProcs() {
return nProcs;
}
}
我知道这有多长,但我真的需要别人的意见。如果有人看到任何他们认为可能导致问题的东西,请告诉我。我不需要有人为我编写程序(我想我快到了)但我只是......看不出有什么问题。