0

我在让 Amazon EMR 接受自定义 InputFileFormat 时遇到了一些麻烦:

public class Main extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new JobConf(), new Main(), args);
        System.exit(res);
    }


    public int run(String[] args) throws Exception {

        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);

        System.out.println("Input  path: "+inputPath+"\n");
        System.out.println("Output path: "+outputPath+"\n");

        Configuration conf = getConf();
        Job job = new Job(conf, "ProcessDocs");

        job.setJarByClass(Main.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setInputFormatClass(XmlInputFormat.class);

        TextInputFormat.setInputPaths(job, inputPath);
        TextOutputFormat.setOutputPath(job, outputPath);

        job.waitForCompletion(true);

        return 0;
    }   
}

查看日志文件:

2012-06-04 23:35:20,053 INFO org.apache.hadoop.mapred.JobClient (main): Default number of map tasks: null
2012-06-04 23:35:20,054 INFO org.apache.hadoop.mapred.JobClient (main): Setting default number of map tasks based on cluster size to : 6
2012-06-04 23:35:20,054 INFO org.apache.hadoop.mapred.JobClient (main): Default number of reduce tasks: 1
2012-06-04 23:35:20,767 INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat (main): Total input paths to process : 1
2012-06-04 23:35:20,813 INFO com.hadoop.compression.lzo.GPLNativeCodeLoader (main): Loaded native gpl library
2012-06-04 23:35:20,886 WARN com.hadoop.compression.lzo.LzoCodec (main): Could not find build properties file with revision hash
2012-06-04 23:35:20,886 INFO com.hadoop.compression.lzo.LzoCodec (main): Successfully loaded & initialized native-lzo library [hadoop-lzo rev UNKNOWN]
2012-06-04 23:35:20,906 WARN org.apache.hadoop.io.compress.snappy.LoadSnappy (main): Snappy native library is available
2012-06-04 23:35:20,906 INFO org.apache.hadoop.io.compress.snappy.LoadSnappy (main): Snappy native library loaded
2012-06-04 23:35:22,240 INFO org.apache.hadoop.mapred.JobClient (main): Running job: job_201206042333_0001

似乎 EMR 上的 Hadoop 假定默认InputFileFormat阅读器......我做错了什么?

注意:我没有从 Hadoop 收到关于XmlInputClass. *注2:*我进入<property><name>mapreduce.inputformat.class</name><value>com.xyz.XmlInputFormat</value></property>文件jobs/some_job_id.conf.xml

更新:

public class XmlInputFormat extends TextInputFormat {

  public static final String START_TAG_KEY = "xmlinput.start";
  public static final String END_TAG_KEY = "xmlinput.end";

  public RecordReader<LongWritable,Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {

      System.out.println("Creating a new 'XmlRecordReader'");

      return new XmlRecordReader((FileSplit) split, context.getJobConf());
  }

  /*
  @Override
  public RecordReader<LongWritable,Text> getRecordReader(InputSplit inputSplit,
                                                         JobConf jobConf,
                                                         Reporter reporter) throws IOException {
    return new XmlRecordReader((FileSplit) inputSplit, jobConf);
  }
  */

  /**
   * XMLRecordReader class to read through a given xml document to output xml
   * blocks as records as specified by the start tag and end tag
   * 
   */
  public static class XmlRecordReader implements RecordReader<LongWritable,Text> {
    private final byte[] startTag;
    private final byte[] endTag;
    private final long start;
    private final long end;
    private final FSDataInputStream fsin;
    private final DataOutputBuffer buffer = new DataOutputBuffer();

    public XmlRecordReader(FileSplit split, JobConf jobConf) throws IOException {
      startTag = jobConf.get(START_TAG_KEY).getBytes("utf-8");
      endTag = jobConf.get(END_TAG_KEY).getBytes("utf-8");

      System.out.println("XmlInputFormat: Start Tag: "+startTag);
      System.out.println("XmlInputFormat: End Tag  : "+endTag);

      // open the file and seek to the start of the split
      start = split.getStart();
      end = start + split.getLength();
      Path file = split.getPath();
      FileSystem fs = file.getFileSystem(jobConf);
      fsin = fs.open(split.getPath());
      fsin.seek(start);
    }
    ...
4

2 回答 2

0

如果XmlInputFormat不是包含 的同一个 jar 的一部分main(),您可能需要将其构建到主 jar 的名为“lib”的“子文件夹”中,或者创建一个引导操作,将包含的额外 jarXmlInputFormat从 S3 复制到魔术文件夹/home/hadoop/lib,默认情况下是 EMR 上 Hadoop 类路径的一部分。

当然不是假设 FileInputFormat 是抽象的。

根据您的编辑,我认为您的问题的前提是错误的。我怀疑确实找到并使用了输入格式。来自任务尝试的 System.out.println 不会出现在作业的系统日志中,尽管它可能会出现在标准输出摘要中。

于 2012-06-05T00:50:30.397 回答
0

这是我发现在 EMR 或 Hadoop 上运行此自定义 jar 文件的另一种简单方法 http://www.applams.com/2014/05/using-custom-streaming-jar-using-custom.html

于 2014-05-20T02:26:17.443 回答