我的 MapReduce 必须从 HBase 读取记录并且需要写入 zip 文件。我们的客户特别要求减速器输出文件应该只是.zip
文件。
为此,我编写了ZipFileOutputFormat
包装器来压缩记录并写入 zip 文件。
此外,我们不能使用缓冲区并将所有行保存到缓冲区然后迭代,因为某些文件包含 19GB 的记录,那时它会抛出一个java.lang.OutOfMemoryError
.
一切似乎都很好,但有一个问题:
该.zip
文件正在为每个键创建。在我的输出文件中,我可以看到许多输出文件,这些文件是每个行键的分隔文件。我不知道如何将它组合到 zip 文件中。
这是我的实现ZipFileOutputFormat.java
public class ZipFileOutputFormat<K, V> extends FileOutputFormat<K, V> {
public static class ZipRecordWriter<K, V> extends org.apache.hadoop.mapreduce.RecordWriter<K, V> {
private ZipOutputStream zipOut;
public ZipRecordWriter(FSDataOutputStream fileOut) {
zipOut = new ZipOutputStream(fileOut);
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
zipOut.closeEntry();
zipOut.finish();
zipOut.close();
zipOut.flush();
}
@Override
public void write(K key, V value) throws IOException {
String fname = null;
if (key instanceof BytesWritable) {
BytesWritable bk = (BytesWritable) key;
fname = new String(bk.getBytes(), 0, bk.getLength());
} else {
fname = key.toString();
}
ZipEntry ze = new ZipEntry(fname);
zipOut.closeEntry();
zipOut.putNextEntry(ze);
if (value instanceof BytesWritable) {
zipOut.write(((BytesWritable) value).getBytes(), 0, ((BytesWritable) value).getLength());
} else {
zipOut.write(value.toString().getBytes());
}
}
}
//
// @Override
// public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf
// job,
// String name, Progressable progress) throws IOException {
// Path file = FileOutputFormat.getTaskOutputPath(job, name);
// FileSystem fs = file.getFileSystem(job);
// FSDataOutputStream fileOut = fs.create(file, progress);
// return new ZipRecordWriter<K, V>(fileOut);
// }
@Override
public org.apache.hadoop.mapreduce.RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
Configuration conf = job.getConfiguration();
getOutputCommitter(job);
getOutputName(job);
Path file = getDefaultWorkFile(job, ".zip");
// Path file = new Path(committer.getWorkPath()+"/"+fileName);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file);
return new ZipRecordWriter<K, V>(fileOut);
}
}