1

我编写了一个自定义记录阅读器来读取 Hadoop 中的文本和 gzip 文件,因为我有一个特殊要求,即拥有完整的文件数据作为键的值和文件名。来源如下:

public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {

    private CompressionCodecFactory compressionCodecs = null;
    private FileSplit fileSplit;
    private Configuration conf;
    private InputStream in;
    private Text key = new Text("");
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {

        this.fileSplit = (FileSplit) split;
        this.conf = context.getConfiguration();

        final Path file = fileSplit.getPath();
        compressionCodecs = new CompressionCodecFactory(conf);

        final CompressionCodec codec = compressionCodecs.getCodec(file);
        System.out.println(codec);
        FileSystem fs = file.getFileSystem(conf);
        in = fs.open(file);

        if (codec != null) {
            in = codec.createInputStream(in);
        }
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed) {
            byte[] contents = new byte[(int) fileSplit.getLength()];
            Path file = fileSplit.getPath();
            key.set(file.getName());

            try {
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            } finally {
                IOUtils.closeStream(in);
            }

            processed = true;
            return true;
        }

        return false;
    }

    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException {
        return processed ? 1.0f : 0.0f;
    }

    @Override
    public void close() throws IOException {
        // Do nothing
    }

}

问题是我的代码正在读取不完整的文件数据。这可能是因为我使用 fileSplit(指向压缩文件)来确定内容的长度,因此我得到的值较小。因此,这会导致将不完整的数据传递给 Mapper。

有人可以指出如何获取 gizipped 文件数据的实际长度或修改 RecordReader 以使其读取完整数据。

4

2 回答 2

1

扩展@Chris White 的答案,我不得不对他给出的代码进行某些语法更改。如下:

fileLength = (int) fileSplit.getLength();
compressionCodecs = new CompressionCodecFactory(conf);

final CompressionCodec codec = compressionCodecs.getCodec(file);
FileSystem fs = file.getFileSystem(conf);
in = fs.open(file);

if (codec != null) {
    if (codec instanceof GzipCodec) {
        byte[] len = new byte[4];
        try {
            in.skip(fileLength - 4);
            IOUtils.readFully(in, len, 0, len.length);
            fileLength = (len[3] << 24) | (len[2] << 16) + (len[1] << 8) + len[0];
        } finally {
            in.close();
        }
    }

    in = fs.open(file);
    in = codec.createInputStream(in);
}

非常感谢@Chris White 的投入。没有你就做不到:)

于 2013-01-29T06:57:02.450 回答
0

对于 GZip 文件,您可以跳到最后 4 个字节(根据规范),应该返回原始的未压缩文件大小)。请注意,该值是 2^32 的模,因此如果您希望原始文件大于此值,请小心。

因此,您的初始化方法可以修改为类似于此的内容(未经测试!):

final CompressionCodec codec = compressionCodecs.getCodec(file);
System.out.println(codec);
FileSystem fs = file.getFileSystem(conf);
in = fs.open(file);

length = fileSplit.getLength();
if (codec instanceof GZipCodec) {
  // skip to last 4 bytes
  in.seek(length-4);

  // read size
  length = in.readInt();

  // reset stream position
  in.seek(0);
}

现在您应该有实际的文件长度(对于未压缩和 Gzip 压缩),您可以在 nextKeyValue() 方法中使用它。

于 2013-01-25T12:07:42.630 回答