我正在使用 Avro 对 MapR 进行编程,并且是针对 Avro 的真正初学者。输入和输出都是具有特定模式的 avro 格式。
这是我使用 MR1 的mapreduce API 的映射器和化简器:
public class UserClassifyMapReduce extends Configured implements Tool {
private final static Logger logger = LoggerFactory.getLogger(UserClassifyMapReduce.class);
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new UserClassifyMapReduce(), args);
System.exit(res);
}
@Override
public int run(String[] args) throws Exception {
if (args.length < 2) {
logger.error("Usage: UserClassify <intputfile> <outputfolder>");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = new Job(getConf());
job.setJobName("UserClassify");
AvroJob.setInputKeySchema(job, NetflowRecord.getClassSchema());
AvroJob.setOutputKeySchema(job, NetflowRecord.getClassSchema());
FileInputFormat.setInputPaths(job, new Path(args[0]));
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
outPath.getFileSystem(conf).delete(outPath, true);
job.setJarByClass(DataSerializeMapReduce.class);
job.setMapperClass(MyAvroMap.class);
job.setReducerClass(MyAvroReduce.class);
job.setInputFormatClass(AvroKeyInputFormat.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
job.setMapOutputKeyClass(AvroKey.class);
job.setMapOutputValueClass(AvroValue.class);
job.setOutputKeyClass(AvroKey.class);
job.setOutputValueClass(NullWritable.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static class MyAvroMap extends Mapper<AvroKey<NetflowRecord>, NullWritable,
AvroKey<CharSequence>, AvroValue<NetflowRecord>>{
@Override
protected void map(AvroKey<NetflowRecord> key, NullWritable value, Context context)
throws IOException, InterruptedException{
CharSequence devMac = key.datum().getDevMacAddr();
context.write(new AvroKey<CharSequence>(devMac), new AvroValue<NetflowRecord>(key.datum()));
}
}
public static class MyAvroReduce extends Reducer<AvroKey<CharSequence>, AvroValue<NetflowRecord>,
AvroKey<NetflowRecord>, NullWritable>{
@Override
protected void reduce(AvroKey<CharSequence> key, Iterable<AvroValue<NetflowRecord>> values, Context context)
throws IOException, InterruptedException{
(...code)
}
}
}
CastError 抛出类似的消息
java.lang.Exception: java.lang.ClassCastException: class org.apache.avro.mapred.AvroKey
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)
Caused by: java.lang.ClassCastException: class org.apache.avro.mapred.AvroKey
at java.lang.Class.asSubclass(Class.java:3116)
at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:795)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:964)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:673)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:756)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
一个非常简单的程序。你对这个问题有什么想法吗?非常感谢。
贾敏