我有很多文件,其中一些非常小。为了减少映射器的数量,我想使用 CombineFileInputFormat。文件名将用作映射器输出的键的一部分。
我尝试了以下一些方法来获取CombineFileSplit中每个块的文件名,但都失败了。
1)我conf.set("map.input.file", split.getPath(idx).toString());
在函数中看到
initNextRecordReader()
类CombineFileRecordReader
。但NullPointerException
发生在我的map()
功能中,如context.getConfiguration().get("map.input.file")
返回null
。
2)我也尝试((FileSplit) (context.getInputSplit())).getPath().getName()
在映射器中,但java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.CombineFileSplit cannot be cast to org.apache.hadoop.mapreduce.lib.input.FileSplit
发生了。
那么如何获取 CombineFileSplit 中的每个文件名?
==================================================== ==========
输入文件是 lzo 压缩的,暂时没有索引。
以下是我的代码:
我像这样实现 CombineFileInputFormat:
public class CombinedInputFormat extends CombineFileInputFormat<LongWritable, Text> {
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit arg0,
TaskAttemptContext arg1) throws IOException {
// TODO Auto-generated method stub
return new CombineFileRecordReader<LongWritable, Text>((CombineFileSplit) arg0, arg1, CombineLzoLineRecordReader.class);
}
}
这是扩展 LzoLineRecordReader 的 CombineLzoLineRecordReader:
public class CombineLzoLineRecordReader extends LzoLineRecordReader {
private int index;
public CombineLzoLineRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index)
throws IOException, InterruptedException {
this.index = index;
}
public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {
CombineFileSplit combineSplit = (CombineFileSplit) genericSplit;
FileSplit fileSplit = new FileSplit(combineSplit.getPath(index), combineSplit.getOffset(index), combineSplit.getLength(index), combineSplit.getLocations());
super.initialize(fileSplit, context);
}
}
我的地图方法是这样的:
private String getName(String filePath) {
String[] filePathDir = filePath.split("/");
return filePathDir[filePathDir.length - 1];
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String name = getName(context.getConfiguration().get("map.input.file"));
line = new String(value.getBytes(), 0, value.getLength(), "ISO-8859-1");
lineFields = line.split("\t",-1);
if (lineFields != null && lineFields.length >= 20) {
// do something ...
}
}
和错误信息:
13/06/14 17:02:50 INFO mapred.JobClient: Task Id : attempt_201209101415_762760_m_000000_0, Status : FAILED
java.lang.NullPointerException
at com.netease.devilfish.hadoop.job.LogAnalysisDailyMapper.getName(Unknown Source)