我在让 Amazon EMR 接受自定义 InputFileFormat 时遇到了一些麻烦:
public class Main extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new JobConf(), new Main(), args);
System.exit(res);
}
public int run(String[] args) throws Exception {
Path inputPath = new Path(args[0]);
Path outputPath = new Path(args[1]);
System.out.println("Input path: "+inputPath+"\n");
System.out.println("Output path: "+outputPath+"\n");
Configuration conf = getConf();
Job job = new Job(conf, "ProcessDocs");
job.setJarByClass(Main.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(XmlInputFormat.class);
TextInputFormat.setInputPaths(job, inputPath);
TextOutputFormat.setOutputPath(job, outputPath);
job.waitForCompletion(true);
return 0;
}
}
查看日志文件:
2012-06-04 23:35:20,053 INFO org.apache.hadoop.mapred.JobClient (main): Default number of map tasks: null
2012-06-04 23:35:20,054 INFO org.apache.hadoop.mapred.JobClient (main): Setting default number of map tasks based on cluster size to : 6
2012-06-04 23:35:20,054 INFO org.apache.hadoop.mapred.JobClient (main): Default number of reduce tasks: 1
2012-06-04 23:35:20,767 INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat (main): Total input paths to process : 1
2012-06-04 23:35:20,813 INFO com.hadoop.compression.lzo.GPLNativeCodeLoader (main): Loaded native gpl library
2012-06-04 23:35:20,886 WARN com.hadoop.compression.lzo.LzoCodec (main): Could not find build properties file with revision hash
2012-06-04 23:35:20,886 INFO com.hadoop.compression.lzo.LzoCodec (main): Successfully loaded & initialized native-lzo library [hadoop-lzo rev UNKNOWN]
2012-06-04 23:35:20,906 WARN org.apache.hadoop.io.compress.snappy.LoadSnappy (main): Snappy native library is available
2012-06-04 23:35:20,906 INFO org.apache.hadoop.io.compress.snappy.LoadSnappy (main): Snappy native library loaded
2012-06-04 23:35:22,240 INFO org.apache.hadoop.mapred.JobClient (main): Running job: job_201206042333_0001
似乎 EMR 上的 Hadoop 假定默认InputFileFormat
阅读器......我做错了什么?
注意:我没有从 Hadoop 收到关于XmlInputClass
. *注2:*我进入<property><name>mapreduce.inputformat.class</name><value>com.xyz.XmlInputFormat</value></property>
文件jobs/some_job_id.conf.xml
。
更新:
public class XmlInputFormat extends TextInputFormat {
public static final String START_TAG_KEY = "xmlinput.start";
public static final String END_TAG_KEY = "xmlinput.end";
public RecordReader<LongWritable,Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
System.out.println("Creating a new 'XmlRecordReader'");
return new XmlRecordReader((FileSplit) split, context.getJobConf());
}
/*
@Override
public RecordReader<LongWritable,Text> getRecordReader(InputSplit inputSplit,
JobConf jobConf,
Reporter reporter) throws IOException {
return new XmlRecordReader((FileSplit) inputSplit, jobConf);
}
*/
/**
* XMLRecordReader class to read through a given xml document to output xml
* blocks as records as specified by the start tag and end tag
*
*/
public static class XmlRecordReader implements RecordReader<LongWritable,Text> {
private final byte[] startTag;
private final byte[] endTag;
private final long start;
private final long end;
private final FSDataInputStream fsin;
private final DataOutputBuffer buffer = new DataOutputBuffer();
public XmlRecordReader(FileSplit split, JobConf jobConf) throws IOException {
startTag = jobConf.get(START_TAG_KEY).getBytes("utf-8");
endTag = jobConf.get(END_TAG_KEY).getBytes("utf-8");
System.out.println("XmlInputFormat: Start Tag: "+startTag);
System.out.println("XmlInputFormat: End Tag : "+endTag);
// open the file and seek to the start of the split
start = split.getStart();
end = start + split.getLength();
Path file = split.getPath();
FileSystem fs = file.getFileSystem(jobConf);
fsin = fs.open(split.getPath());
fsin.seek(start);
}
...