1

我有一个习惯MyInputFormat,假设要处理多行输入的记录边界问题。但是当我把它MyInputFormat放入我的 UDF 加载函数中时。如下:

import org.apache.hadoop.mapreduce.InputFormat;
public class EccUDFLogLoader extends LoadFunc {
    @Override
    public InputFormat getInputFormat() {
        System.out.println("I am in getInputFormat function");
        return new MyInputFormat();
    }
}

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class MyInputFormat extends TextInputFormat {
    public RecordReader createRecordReader(InputSplit inputSplit, JobConf jobConf) throws IOException {
        System.out.prinln("I am in createRecordReader");
        //MyRecordReader suppose to handle record boundary
        return new MyRecordReader((FileSplit)inputSplit, jobConf);
    }
}

对于每个映射器,它打印出来I am in getInputFormat function但不是I am in createRecordReader. 我想知道是否有人可以提供有关如何将我的服装 MyInputFormat 连接到 PIG 的 UDF 加载器的提示?非常感谢。

我在 Amazon EMR 上使用 PIG。

4

1 回答 1

1

您的签名与父类的签名不匹配(您缺少 Reporter 参数),试试这个:

@Override
public RecordReader<LongWritable, Text> getRecordReader(
        InputSplit inputSplit, JobConf jobConf, Reporter reporter)
             throws IOException {
  System.out.prinln("I am in createRecordReader");
  //MyRecordReader suppose to handle record boundary
  return new MyRecordReader((FileSplit)inputSplit, jobConf);
}

编辑对不起,我之前没有发现这一点,正如您所指出的,您需要使用新的 API 签名:

@Override
public RecordReader<LongWritable, Text> 
      createRecordReader(InputSplit split,
             TaskAttemptContext context) {
  System.out.prinln("I am in createRecordReader");
  //MyRecordReader suppose to handle record boundary
  return new MyRecordReader((FileSplit)inputSplit, jobConf);
}

并且您的 MyRecordReader 类需要扩展org.apache.hadoop.mapreduce.RecordReader该类

于 2012-12-19T01:33:05.153 回答