我有一份为映射器和减速器提供不同输出类型的工作。
在映射过程中,我收到一个错误,这意味着收集器需要减速器输出类型的输出,而不是映射器输出类型。
我在映射器启动期间记录类型,并获得预期的结果。
工作设置
// reducer
conf.setOutputKeyClass(MapRepresentation.class);
conf.setOutputValueClass(DoubleRepresentation.class);
// mapper
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(PairWritable.class);
日志输出
2013-09-22 09:41:16,388 INFO SplitByModelNodeMR: *** getMapOutputKeyClass(): class org.apache.hadoop.io.Text
2013-09-22 09:41:16,389 INFO SplitByModelNodeMR: *** getMapOutputValueClass(): class SplitByModelNodeMR$PairWritable
2013-09-22 09:41:16,389 INFO SplitByModelNodeMR: *** getOutputKeyClass(): class MapRepresentation
2013-09-22 09:41:16,389 INFO SplitByModelNodeMR: *** getOutputValueClass(): class DoubleRepresentation
错误
java.io.IOException: wrong key class: 6fcd88ae126f2f76-6 is not class MapRepresentation
at org.apache.hadoop.io.SequenceFile$BlockCompressWriter.append(SequenceFile.java:1466)
at org.apache.hadoop.mapred.SequenceFileOutputFormat$1.write(SequenceFileOutputFormat.java:71)
at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.collect(MapTask.java:716)
at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:526)
at SplitByModelNodeMR$mapper.map(SplitByModelNodeMR.java:299)
at SplitByModelNodeMR$mapper.map(SplitByModelNodeMR.java:245)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:417)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
at org.apache.hadoop.mapred.Child.main(Child.java:262)
编辑
这是完整的代码(精简)
public class DummyMR implements Tool
{
private static final Logger log = LoggerFactory.getLogger(DummyMR.class);
public static void main(String[] args)
{
try
{
ToolRunner.run(new DummyMR(), args);
}
catch (Exception e)
{
log.error("Exception caught in main: ", e);
}
}
public static void runJob(Configuration configuration, Path input,
Path output) throws IOException
{
JobConf conf = new JobConf(configuration);
// reducer
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(LongWritable.class);
// mapper
conf.setMapOutputKeyClass(Text.class);
conf.setMapOutputValueClass(LongWritable.class);
conf.setMapperClass(mapper.class);
conf.setJobName("DummyMR: " + input.toString());
conf.setInputFormat(KeyValueTextInputFormat.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
SequenceFileInputFormat.addInputPath(conf, input);
FileOutputFormat.setOutputPath(conf, output);
RunningJob job = JobClient.runJob(conf);
return;
}
public static class mapper extends MapReduceBase implements
Mapper<Text, Text, Text, LongWritable>
{
@Override
public void configure(JobConf job)
{
super.configure(job);
log.info("*** getMapOutputKeyClass(): "
+ job.getMapOutputKeyClass().toString());
log.info("*** getMapOutputValueClass(): "
+ job.getMapOutputValueClass().toString());
log.info("*** getOutputKeyClass(): "
+ job.getOutputKeyClass().toString());
log.info("*** getOutputValueClass(): "
+ job.getOutputValueClass().toString());
}
@Override
public void close() throws IOException
{
super.close();
}
@Override
public void map(Text k, Text v, OutputCollector<Text, LongWritable> outputCollector,
Reporter reported) throws IOException
{
try
{
LongWritable val = new LongWritable(5);
Text key = new Text("KEY");
outputCollector.collect(key, val);
}
catch (Exception e)
{
log.warn("map()", e);
}
}
}
public static class reducer extends MapReduceBase implements
Reducer<Text, LongWritable, LongWritable, LongWritable>
{
@Override
public void configure(JobConf job)
{
super.configure(job);
}
@Override
public void close() throws IOException
{
super.close();
}
@Override
public void reduce(Text key, Iterator<LongWritable> values,
OutputCollector<LongWritable, LongWritable> sharedOutputCollector,
Reporter reporter)
throws IOException
{
while (values.hasNext())
{
LongWritable value = values.next();
sharedOutputCollector.collect(new LongWritable(),new LongWritable());
}
}
}
@Override
public int run(String[] args) throws Exception
{
GenericOptionsParser parserGO = new GenericOptionsParser(getConf(),
args);
args = parserGO.getRemainingArgs();
log.info("Starting.");
try
{
Options options = new Options();
options.addOption("i", true, "Input Path");
options.addOption("o", true, "Output Path");
CommandLineParser parser = new PosixParser();
if (args.length == 0)
{
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(NewJoiner.class.getName(), options);
return -1;
}
String inputPath = "input";
String outputPath = "ouput";
CommandLine line;
try
{
line = parser.parse(options, args);
if (line.hasOption("i"))
{
inputPath = line.getOptionValue("i");
}
if (line.hasOption("o"))
{
outputPath = line.getOptionValue("o");
}
}
catch (ParseException e)
{
log.error("CMD Line Parsing failed ",e);
return -1;
}
runJob(getConf(), new Path(inputPath), new Path(outputPath));
}
catch (Exception e)
{
log.error("Exception caught in main: ", e);
}
log.info("Done.");
return 0;
}
@Override
public Configuration getConf()
{
return this.conf;
}
@Override
public void setConf(Configuration conf)
{
this.conf = conf;
}
protected Configuration conf;
}