5

我的 MapReduce 必须从 HBase 读取记录并且需要写入 zip 文件。我们的客户特别要求减速器输出文件应该只是.zip文件。

为此,我编写了ZipFileOutputFormat包装器来压缩记录并写入 zip 文件。

此外,我们不能使用缓冲区并将所有行保存到缓冲区然后迭代,因为某些文件包含 19GB 的记录,那时它会抛出一个java.lang.OutOfMemoryError.

一切似乎都很好,但有一个问题:

.zip文件正在为每个键创建。在我的输出文件中,我可以看到许多输出文件,这些文件是每个行键的分隔文件。我不知道如何将它组合到 zip 文件中。

这是我的实现ZipFileOutputFormat.java

public class ZipFileOutputFormat<K, V> extends FileOutputFormat<K, V> {

    public static class ZipRecordWriter<K, V> extends org.apache.hadoop.mapreduce.RecordWriter<K, V> {

        private ZipOutputStream zipOut;

        public ZipRecordWriter(FSDataOutputStream fileOut) {
            zipOut = new ZipOutputStream(fileOut);
        }
        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            zipOut.closeEntry();
            zipOut.finish();
            zipOut.close();
            zipOut.flush();
        }
        @Override
        public void write(K key, V value) throws IOException {
            String fname = null;
            if (key instanceof BytesWritable) {
                BytesWritable bk = (BytesWritable) key;
                fname = new String(bk.getBytes(), 0, bk.getLength());
            } else {
                fname = key.toString();
            }
            ZipEntry ze = new ZipEntry(fname);
            zipOut.closeEntry();
            zipOut.putNextEntry(ze);

            if (value instanceof BytesWritable) {
                zipOut.write(((BytesWritable) value).getBytes(), 0, ((BytesWritable) value).getLength());
            } else {
                zipOut.write(value.toString().getBytes());
            }

        }

    }

    //
    // @Override
    // public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf
    // job,
    // String name, Progressable progress) throws IOException {
    // Path file = FileOutputFormat.getTaskOutputPath(job, name);
    // FileSystem fs = file.getFileSystem(job);
    // FSDataOutputStream fileOut = fs.create(file, progress);
    // return new ZipRecordWriter<K, V>(fileOut);
    // }

    @Override
    public org.apache.hadoop.mapreduce.RecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
            throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        Configuration conf = job.getConfiguration();
        getOutputCommitter(job);

        getOutputName(job);

        Path file = getDefaultWorkFile(job, ".zip");
        // Path file = new Path(committer.getWorkPath()+"/"+fileName);

        FileSystem fs = file.getFileSystem(conf);
        FSDataOutputStream fileOut = fs.create(file);

        return new ZipRecordWriter<K, V>(fileOut);

    }
}
4

0 回答 0