3

我正在使用 Hadoop 0.20.2,并且正在使用旧 API。我正在尝试将大块数据发送到映射器,而不是一次发送一行(数据涵盖多行)。我尝试使用 NLineInputFormat 设置一次获取多少行,但映射器一次仍然只接收 1 行。我很确定我有正确的代码。是否有任何原因导致这无法正常工作?

供你参考,

JobConf conf = new JobConf(WordCount.class);

conf.setInt("mapred.line.input.format.linespermap", 2);

conf.setInputFormat(NLineInputFormat.class);

基本上,我使用来自http://hadoop.apache.org/common/docs/r0.20.2/mapred_tutorial.html#Example%3A+WordCount+v1.0的示例代码,仅更改 TextInputFormat。

提前致谢

4

2 回答 2

4

NLineInputFormat 旨在确保所有映射器都接收相同数量的输入记录(每个文件拆分的最后部分除外)。

因此,通过将输入属性更改为 2,每个映射器应该(最多)接收 2 个输入对,而不是一次接收 2 个输入行(这是我认为您正在寻找的)。

您应该能够通过查看每个映射任务的计数器来确认这一点,“映射输入记录”应该为大多数映射器报告 2

于 2012-06-12T18:01:09.927 回答
0

我最近通过简单地创建自己的 InputFormat 来解决这个问题,它覆盖 NLineInputFormat 并实现自定义 MultiLineRecordReader 而不是默认的 LineReader。

我选择扩展 NLineInputFormat 是因为我希望获得相同的保证,即每次拆分恰好有 N 行。

这个记录阅读器几乎是从http://bigdatacircus.com/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/

我唯一修改的是maxLineLength现在使用新 API 的属性,它的值是NLINESTOPROCESS从 NLineInputFormat 的setNumLinesPerSplit()硬编码中读取的(以获得更大的灵活性)。

结果如下:

public class MultiLineInputFormat extends NLineInputFormat{
    @Override
    public RecordReader<LongWritable, Text> createRecordReader(InputSplit genericSplit, TaskAttemptContext context) {
        context.setStatus(genericSplit.toString());
        return new MultiLineRecordReader();
    }

    public static class MultiLineRecordReader extends RecordReader<LongWritable, Text>{
        private int NLINESTOPROCESS;
        private LineReader in;
        private LongWritable key;
        private Text value = new Text();
        private long start =0;
        private long end =0;
        private long pos =0;
        private int maxLineLength;

        @Override
        public void close() throws IOException {
            if (in != null) {
                in.close();
            }
        }

        @Override
        public LongWritable getCurrentKey() throws IOException,InterruptedException {
            return key;
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            if (start == end) {
                return 0.0f;
            }
            else {
                return Math.min(1.0f, (pos - start) / (float)(end - start));
            }
        }

        @Override
        public void initialize(InputSplit genericSplit, TaskAttemptContext context)throws IOException, InterruptedException {
            NLINESTOPROCESS = getNumLinesPerSplit(context);
            FileSplit split = (FileSplit) genericSplit;
            final Path file = split.getPath();
            Configuration conf = context.getConfiguration();
            this.maxLineLength = conf.getInt("mapreduce.input.linerecordreader.line.maxlength",Integer.MAX_VALUE);
            FileSystem fs = file.getFileSystem(conf);
            start = split.getStart();
            end= start + split.getLength();
            boolean skipFirstLine = false;
            FSDataInputStream filein = fs.open(split.getPath());

            if (start != 0){
                skipFirstLine = true;
                --start;
                filein.seek(start);
            }
            in = new LineReader(filein,conf);
            if(skipFirstLine){
                start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start));
            }
            this.pos = start;
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (key == null) {
                key = new LongWritable();
            }
            key.set(pos);
            if (value == null) {
                value = new Text();
            }
            value.clear();
            final Text endline = new Text("\n");
            int newSize = 0;
            for(int i=0;i<NLINESTOPROCESS;i++){
                Text v = new Text();
                while (pos < end) {
                    newSize = in.readLine(v, maxLineLength,Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),maxLineLength));
                    value.append(v.getBytes(),0, v.getLength());
                    value.append(endline.getBytes(),0, endline.getLength());
                    if (newSize == 0) {
                        break;
                    }
                    pos += newSize;
                    if (newSize < maxLineLength) {
                        break;
                    }
                }
            }
            if (newSize == 0) {
                key = null;
                value = null;
                return false;
            } else {
                return true;
            }
        }
    }

}
于 2015-04-29T06:27:55.727 回答