我想使用 Hadoop 0.20.0 / 0.20.2 的 CombineFileInputFormat,这样它每条记录处理 1 个文件,并且不会影响数据 - 局部性(它通常会处理)。
Tom White 的 Hadoop Definitive Guide 中提到了这一点,但他没有展示如何做到这一点。相反,他转向序列文件。
我对记录阅读器中已处理变量的含义感到非常困惑。任何代码示例都会有很大帮助。
提前致谢..
检查以下用于组合文件输入格式的输入格式。
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
/**
* CustomInputformat which implements the createRecordReader of abstract class CombineFileInputFormat
*/
public class MyCombineFileInputFormat extends CombineFileInputFormat {
public static class MyRecordReader extends RecordReader<LongWritable,Text>{
private LineRecordReader delegate=null;
private int idx;
public MyRecordReader(CombineFileSplit split,TaskAttemptContext taskcontext ,Integer idx) throws IOException {
this.idx=idx;
delegate = new LineRecordReader();
}
@Override
public void close() throws IOException {
delegate.close();
}
@Override
public float getProgress() {
try {
return delegate.getProgress();
}
catch(Exception e) {
return 0;
}
}
@Override
public void initialize(InputSplit split, TaskAttemptContext taskcontext) throws IOException {
CombineFileSplit csplit=(CombineFileSplit)split;
FileSplit fileSplit = new FileSplit(csplit.getPath(idx), csplit.getOffset(idx), csplit.getLength(idx), csplit.getLocations());
delegate.initialize(fileSplit, taskcontext);
}
@Override
public LongWritable getCurrentKey() throws IOException,
InterruptedException {
return delegate.getCurrentKey();
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return delegate.getCurrentValue();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return delegate.nextKeyValue();
}
}
@SuppressWarnings("unchecked")
@Override
public RecordReader createRecordReader(InputSplit split,TaskAttemptContext taskcontext) throws IOException {
return new CombineFileRecordReader((CombineFileSplit) split, taskcontext, MyRecordReader.class);
}
}
这是从所谓的“新 API”中使用 CombineFileInputFormat 的最简单方法。假设您的实际输入格式是MyFormat,并且它适用于MyKey的键和MyValue的值(例如,可能是 的某个子类SequenceFileInputFormat< MyKey, MyValue >
)。
public class CombinedMyFormat extends CombineFileInputFormat< MyKey, MyValue > {
// exists merely to fix the key/value types and
// inject the delegate format to the superclass
// if MyFormat does not use state, consider a constant instead
private static class CombineMyKeyMyValueReaderWrapper
extends CombineFileRecordReaderWrapper< MyKey, MyValue > {
protected CombineMyKeyMyValueReaderWrapper(
CombineFileSplit split, TaskAttemptContext ctx, Integer idx
) throws IOException, InterruptedException {
super( new MyFormat(), split, ctx, idx );
}
}
@Override
public RecordReader< MyKey, MyValue > createRecordReader(
InputSplit split, TaskAttemptContext ctx
) throws IOException {
return new CombineFileRecordReader< MyKey, MyValue >(
( CombineFileSplit )split, ctx, CombineMyKeyMyValueReaderWrapper.class
);
}
}
在您的工作驱动程序中,您现在应该可以CombinedMyFormat
参加MyFormat
. 您还应该设置一个最大拆分大小属性,以防止 Hadoop 将整个输入组合成一个拆分。