0

我有一份为映射器和减速器提供不同输出类型的工作。
在映射过程中,我收到一个错误,这意味着收集器需要减速器输出类型的输出,而不是映射器输出类型。

我在映射器启动期间记录类型,并获得预期的结果。

工作设置

        // reducer
    conf.setOutputKeyClass(MapRepresentation.class);
    conf.setOutputValueClass(DoubleRepresentation.class);

    // mapper
    conf.setMapOutputKeyClass(Text.class);
    conf.setMapOutputValueClass(PairWritable.class);

日志输出

2013-09-22 09:41:16,388 INFO SplitByModelNodeMR: *** getMapOutputKeyClass(): class org.apache.hadoop.io.Text

2013-09-22 09:41:16,389 INFO SplitByModelNodeMR: *** getMapOutputValueClass(): class SplitByModelNodeMR$PairWritable

2013-09-22 09:41:16,389 INFO SplitByModelNodeMR: *** getOutputKeyClass(): class MapRepresentation

2013-09-22 09:41:16,389 INFO SplitByModelNodeMR: *** getOutputValueClass(): class DoubleRepresentation

错误

java.io.IOException: wrong key class: 6fcd88ae126f2f76-6 is not class MapRepresentation
at org.apache.hadoop.io.SequenceFile$BlockCompressWriter.append(SequenceFile.java:1466)
at org.apache.hadoop.mapred.SequenceFileOutputFormat$1.write(SequenceFileOutputFormat.java:71)
at org.apache.hadoop.mapred.MapTask$DirectMapOutputCollector.collect(MapTask.java:716)
at org.apache.hadoop.mapred.MapTask$OldOutputCollector.collect(MapTask.java:526)
at SplitByModelNodeMR$mapper.map(SplitByModelNodeMR.java:299)
at SplitByModelNodeMR$mapper.map(SplitByModelNodeMR.java:245)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:417)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:332)
at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
at org.apache.hadoop.mapred.Child.main(Child.java:262)

编辑

这是完整的代码(精简)

public class DummyMR implements Tool
{
    private static final Logger log = LoggerFactory.getLogger(DummyMR.class);

    public static void main(String[] args)
    {
        try
        {
            ToolRunner.run(new DummyMR(), args);
        }
        catch (Exception e)
        {
            log.error("Exception caught in main: ", e);
        }
    }

    public static void runJob(Configuration configuration, Path input,
            Path output) throws IOException
    {
        JobConf conf = new JobConf(configuration);

        // reducer
        conf.setOutputKeyClass(LongWritable.class);
        conf.setOutputValueClass(LongWritable.class);

        // mapper
        conf.setMapOutputKeyClass(Text.class);
        conf.setMapOutputValueClass(LongWritable.class);
        conf.setMapperClass(mapper.class);

        conf.setJobName("DummyMR: " + input.toString());

        conf.setInputFormat(KeyValueTextInputFormat.class);
        conf.setOutputFormat(SequenceFileOutputFormat.class);

        SequenceFileInputFormat.addInputPath(conf, input);

        FileOutputFormat.setOutputPath(conf, output);

        RunningJob job = JobClient.runJob(conf);
        return;
    }

    public static class mapper extends MapReduceBase implements
            Mapper<Text, Text, Text, LongWritable>
    {
        @Override
        public void configure(JobConf job)
        {
            super.configure(job);
            log.info("*** getMapOutputKeyClass(): "
                    + job.getMapOutputKeyClass().toString());
            log.info("*** getMapOutputValueClass(): "
                    + job.getMapOutputValueClass().toString());
            log.info("*** getOutputKeyClass(): "
                    + job.getOutputKeyClass().toString());
            log.info("*** getOutputValueClass(): "
                    + job.getOutputValueClass().toString());
        }

        @Override
        public void close() throws IOException
        {
            super.close();
        }

        @Override
        public void map(Text k, Text v, OutputCollector<Text, LongWritable> outputCollector,
                Reporter reported) throws IOException
        {
            try
            {
                LongWritable val = new LongWritable(5);
                Text key = new Text("KEY");
                outputCollector.collect(key, val);
            }
            catch (Exception e)
            {
                log.warn("map()", e);
            }
        }
    }

    public static class reducer extends MapReduceBase implements
            Reducer<Text, LongWritable, LongWritable, LongWritable>
    {

        @Override
        public void configure(JobConf job)
        {
            super.configure(job);
        }

        @Override
        public void close() throws IOException
        {
            super.close();
        }

        @Override
        public void reduce(Text key, Iterator<LongWritable> values, 
                OutputCollector<LongWritable, LongWritable> sharedOutputCollector, 
                Reporter reporter) 
                throws IOException
        {
            while (values.hasNext())
            {
                LongWritable value = values.next();
                sharedOutputCollector.collect(new LongWritable(),new LongWritable());
            }
        }
    }

    @Override
    public int run(String[] args) throws Exception
    {
        GenericOptionsParser parserGO = new GenericOptionsParser(getConf(),
                args);
        args = parserGO.getRemainingArgs();

        log.info("Starting.");
        try
        {
            Options options = new Options();
            options.addOption("i", true, "Input Path");
            options.addOption("o", true, "Output Path");
            CommandLineParser parser = new PosixParser();
            if (args.length == 0)
            {
                HelpFormatter formatter = new HelpFormatter();
                formatter.printHelp(NewJoiner.class.getName(), options);
                return -1;
            }

            String inputPath = "input";
            String outputPath = "ouput";

            CommandLine line;
            try
            {
                line = parser.parse(options, args);
                if (line.hasOption("i"))
                {
                    inputPath = line.getOptionValue("i");
                }
                if (line.hasOption("o"))
                {
                    outputPath = line.getOptionValue("o");
                }
            }
            catch (ParseException e)
            {
                log.error("CMD Line Parsing failed ",e);
                return -1;
            }

            runJob(getConf(), new Path(inputPath), new Path(outputPath));
        }
        catch (Exception e)
        {
            log.error("Exception caught in main: ", e);
        }
        log.info("Done.");
        return 0;
    }

    @Override
    public Configuration getConf()
    {
        return this.conf;
    }

    @Override
    public void setConf(Configuration conf)
    {
        this.conf = conf;
    }

    protected Configuration conf;
}
4

1 回答 1

1

您还需要设置以下参数:

conf.setReducerClass(reducer.class);
conf.setJarByClass(DummyMR.class);
于 2013-09-22T13:42:21.493 回答