2

我有一个巨大的文本文件,我想拆分文件,以便每个块有 5 行。我实现了自己的 GWASInputFormat 和 GWASRecordReader 类。但是我的问题是,在以下代码中(我从http://bigdatacircus.com/2012/08/01/wordcount-with-custom-record-reader-of-textinputformat/复制),在 initialize() 方法中我有以下几行

FileSplit split = (FileSplit) genericSplit;
final Path file = split.getPath();
Configuration conf = context.getConfiguration();

我的问题是,在我的 GWASRecordReader 类中调用 initialize() 方法时,文件是否已经拆分?我以为我是在 GWASRecordReader 类中做的(拆分)。让我知道我的思考过程是否就在这里。

package com.test;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;

public class GWASRecordReader extends RecordReader<LongWritable, Text> {

private final int NLINESTOPROCESS = 5;
private LineReader in;
private LongWritable key;
private Text value = new Text();
private long start = 0;
private long pos = 0;
private long end = 0;
private int maxLineLength;

public void close() throws IOException {
    if(in != null) {
        in.close();
    }
}

public LongWritable getCurrentKey() throws IOException, InterruptedException {
    return key;
}

public Text getCurrentValue() throws IOException, InterruptedException {
    return value;
}

public float getProgress() throws IOException, InterruptedException {
    if(start == end) {
        return 0.0f;
    }
    else {
        return Math.min(1.0f, (pos - start)/(float) (end - start));
    }
}

public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    final Path file = split.getPath();
    Configuration conf = context.getConfiguration();
    this.maxLineLength = conf.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);
    FileSystem fs = file.getFileSystem(conf);
    start = split.getStart();
    end = start + split.getLength();
    System.out.println("---------------SPLIT LENGTH---------------------" + split.getLength());
    boolean skipFirstLine = false;
    FSDataInputStream filein = fs.open(split.getPath());

    if(start != 0) {
        skipFirstLine = true;
        --start;
        filein.seek(start);
    }

    in = new LineReader(filein, conf);
    if(skipFirstLine) {
        start += in.readLine(new Text(),0,(int)Math.min((long)Integer.MAX_VALUE, end - start));
    }
    this.pos = start;
}

public boolean nextKeyValue() throws IOException, InterruptedException {
    if (key == null) {
        key = new LongWritable();
    }

    key.set(pos);

    if (value == null) {
        value = new Text();
    }
    value.clear();
    final Text endline = new Text("\n");
    int newSize = 0;
    for(int i=0; i<NLINESTOPROCESS;i++) {
        Text v = new Text();
        while( pos < end) {
            newSize = in.readLine(v ,maxLineLength, Math.max((int)Math.min(Integer.MAX_VALUE, end - pos), maxLineLength));
            value.append(v.getBytes(), 0, v.getLength());
            value.append(endline.getBytes(),0,endline.getLength());
            if(newSize == 0) {
                break;
            }
            pos += newSize;
            if(newSize < maxLineLength) {
                break;
            }
        }
    }

    if(newSize == 0) {
        key = null;
        value = null;
        return false;
    } else {
        return true;
    }
}
}
4

1 回答 1

7

是的,输入文件已经被拆分。它基本上是这样的:

your input file(s) -> InputSplit -> RecordReader -> Mapper...

基本上,InputSplit将输入分解为块,RecordReader将这些块分解为键/值对。请注意,InputSplitRecordReader将由InputFormat您使用的决定。例如,TextInputFormat用于FileSplit拆分输入,然后LineRecordReader以位置为键处理每一行,并将行本身作为值。所以在你的GWASInputFormat你需要看看FileSplit你用什么样的来看看它传递给什么GWASRecordReader

我建议查看NLineInputFormat哪个“将 N 行输入拆分为一个拆分”。它可能能够完全按照您自己的意愿去做。

如果您尝试一次获取 5 行作为值,并将第一行的行号作为键,我会说您可以使用自定义NLineInputFormat和 custom来做到这一点LineRecordReader。我认为您不必担心输入拆分,因为输入格式可以将其拆分为那 5 行块。您RecordReader将与 非常相似LineRecordReader,但不是获取块开始的字节位置,而是获取行号。所以除了那个小改动之外,代码几乎是相同的。所以你基本上可以复制和粘贴NLineInputFormatLineRecordReader然后让输入格式使用你的记录阅读器来获取行号。代码将非常相似。

于 2012-11-12T17:52:28.173 回答