1

我正在使用 Avro 对 MapR 进行编程,并且是针对 Avro 的真正初学者。输入和输出都是具有特定模式的 avro 格式。

这是我使用 MR1 的mapreduce API 的映射器和化简器:

public class UserClassifyMapReduce extends Configured implements Tool {

private final static Logger logger = LoggerFactory.getLogger(UserClassifyMapReduce.class);

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new UserClassifyMapReduce(), args);
        System.exit(res);
    }

    @Override
    public int run(String[] args) throws Exception {
        if (args.length < 2) {
            logger.error("Usage: UserClassify <intputfile> <outputfolder>");
            System.exit(-1);
        }
        Configuration conf = new Configuration();
        Job job = new Job(getConf());
        job.setJobName("UserClassify");

        AvroJob.setInputKeySchema(job, NetflowRecord.getClassSchema());
        AvroJob.setOutputKeySchema(job, NetflowRecord.getClassSchema());

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        Path outPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outPath);
        outPath.getFileSystem(conf).delete(outPath, true);

        job.setJarByClass(DataSerializeMapReduce.class);
        job.setMapperClass(MyAvroMap.class);
        job.setReducerClass(MyAvroReduce.class);

        job.setInputFormatClass(AvroKeyInputFormat.class);
        job.setOutputFormatClass(AvroKeyOutputFormat.class);

        job.setMapOutputKeyClass(AvroKey.class);
        job.setMapOutputValueClass(AvroValue.class);

        job.setOutputKeyClass(AvroKey.class);
        job.setOutputValueClass(NullWritable.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static class MyAvroMap extends Mapper<AvroKey<NetflowRecord>, NullWritable,
                                        AvroKey<CharSequence>, AvroValue<NetflowRecord>>{
        @Override
        protected void map(AvroKey<NetflowRecord> key, NullWritable value, Context context) 
                                        throws IOException, InterruptedException{
            CharSequence devMac = key.datum().getDevMacAddr();
            context.write(new AvroKey<CharSequence>(devMac), new AvroValue<NetflowRecord>(key.datum()));
        }
    }

    public static class MyAvroReduce extends Reducer<AvroKey<CharSequence>, AvroValue<NetflowRecord>, 
                                        AvroKey<NetflowRecord>, NullWritable>{

        @Override
        protected void reduce(AvroKey<CharSequence> key, Iterable<AvroValue<NetflowRecord>> values, Context context) 
                                        throws IOException, InterruptedException{
            (...code)
        }
    }

}

CastError 抛出类似的消息

    java.lang.Exception: java.lang.ClassCastException: class org.apache.avro.mapred.AvroKey
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)
Caused by: java.lang.ClassCastException: class org.apache.avro.mapred.AvroKey
    at java.lang.Class.asSubclass(Class.java:3116)
    at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:795)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:964)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:673)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:756)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
    at java.util.concurrent.FutureTask.run(FutureTask.java:166)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:724)

一个非常简单的程序。你对这个问题有什么想法吗?非常感谢。

贾敏

4

1 回答 1

1

您似乎缺少映射器输出键的架构AvroKey<CharSequence>。添加相应的架构就足够了:

AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
于 2013-10-23T00:53:46.290 回答