4

我正在编写一个 MapReduce 代码,其中我必须读取文件名作为键,文件内容作为它的值。为此,我在 StackOverflow 上发布了这个问题。它适用于文本文件,但开始出现 gzip 文件的问题。因此,参考LineRecordReader类,我对代码进行了一些修改。代码片段是:

public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {

    private CompressionCodecFactory compressionCodecs = null;
    private FileSplit fileSplit;
    private Configuration conf;
    private InputStream in;
    private Text key = new Text("");
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {

        this.fileSplit = (FileSplit) split;
        this.conf = context.getConfiguration();

        final Path file = fileSplit.getPath();
        compressionCodecs = new CompressionCodecFactory(conf);

        final CompressionCodec codec = compressionCodecs.getCodec(file);
        System.out.println(codec);
        FileSystem fs = file.getFileSystem(conf);
        in = fs.open(file);

        if (codec != null) {
            in = codec.createInputStream(in);
        }
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed) {
            byte[] contents = new byte[(int) fileSplit.getLength()];
            Path file = fileSplit.getPath();
            key.set(file.getName());

            try {
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            } finally {
                IOUtils.closeStream(in);
            }

            processed = true;
            return true;
        }

        return false;
    }

    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException {
        return processed ? 1.0f : 0.0f;
    }

    @Override
    public void close() throws IOException {
        // Do nothing
    }

}

问题是我正在获取codecobject的值,就null好像该文件是gz文件一样。需要注意的一件事是,出于我自己的目的,我在最后附加了带有日期的文件。但我觉得这应该不是问题,因为我听说 Unix 不使用扩展名来确定文件类型。

有人可以告诉我有什么问题吗?

4

1 回答 1

1

CompressionCodecFactory 确实使用文件扩展名来确定要使用的编解码器 - 因此,如果文件以结尾,.gz则在调用时应返回 GzipCodec getCodec。如果您有.gz.2012-01-24扩展名,那么这将不会返回 gzip 编解码器。因此,您需要修改文件命名约定以交换日期和扩展名。

于 2013-01-24T11:39:51.327 回答