我正在尝试编写一个采用 Avro 1.7.4 文件的 Mapper(使用 .mapreduce)api。我正在使用特定的 API(maven avro 插件)从 .avdl 文件生成我的对象。
我已验证 Avro 文件已正确生成,并且可以使用 Avro 工具 jar 中的 avro-to-json 函数来读回文件。
我收到的错误是:
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to
org.lab41.cyprus.domain.NetworkRecord
at org.lab41.cyprus.mapreduce.RollupAvroFilesMapper.map(RollupAvroFilesMapper.java:45)
at org.lab41.cyprus.mapreduce.RollupAvroFilesMapper.map(RollupAvroFilesMapper.java:23)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:645)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:263)\
驱动类如下:
公共类 RollupAvroFiles 扩展配置实现工具{
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
String input, output;
if (otherArgs.length == 2) {
input = otherArgs[0];
output = otherArgs[1];
} else {
return 1;
}
/** configure Job **/
Job job = new Job(conf, "RollupAvroFiles");
job.setJarByClass(RollupAvroFiles.class);
job.setUserClassesTakesPrecedence(true);
job.setNumReduceTasks(1);
FileInputFormat.setInputPaths(job, new Path(input));
job.setInputFormatClass(AvroKeyInputFormat.class);
AvroJob.setInputKeySchema(job, NetworkRecord.SCHEMA$);
job.setMapperClass(RollupAvroFilesMapper.class);
AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.LONG));
AvroJob.setMapOutputValueSchema(job, NetworkRecord.SCHEMA$);
job.setMapOutputKeyClass(AvroKey.class);
job.setMapOutputValueClass(AvroValue.class);
job.setReducerClass(RollupAvroFilesReducer.class);
AvroJob.setOutputKeySchema(job, NetworkRecord.SCHEMA$);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
FileOutputFormat.setOutputPath(job, new Path(output));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new RollupAvroFiles(), args);
System.exit(exitCode);
}
}
Mapper 类内容如下:
public class RollupAvroFilesMapper
extends Mapper<AvroKey<NetworkRecord>, NullWritable, AvroKey<Long>,
AvroValue<NetworkRecord>> {
@Override
protected void setup(Context context) throws IOException, InterruptedException {
}
@Override
protected void map(AvroKey<NetworkRecord> key, NullWritable value, Context context)
throws IOException, InterruptedException {
NetworkRecord = key.datum();
….
}
}
任何想法将不胜感激