2

我正在尝试编写一个采用 Avro 1.7.4 文件的 Mapper(使用 .mapreduce)api。我正在使用特定的 API(maven avro 插件)从 .avdl 文件生成我的对象。

我已验证 Avro 文件已正确生成,并且可以使用 Avro 工具 jar 中的 avro-to-json 函数来读回文件。

我收到的错误是:

java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to
org.lab41.cyprus.domain.NetworkRecord
    at org.lab41.cyprus.mapreduce.RollupAvroFilesMapper.map(RollupAvroFilesMapper.java:45)
    at org.lab41.cyprus.mapreduce.RollupAvroFilesMapper.map(RollupAvroFilesMapper.java:23)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:140)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:645)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:263)\

驱动类如下:

公共类 RollupAvroFiles 扩展配置实现工具{

@Override
public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

    String input, output;
    if (otherArgs.length == 2) {
        input = otherArgs[0];
        output = otherArgs[1];
    } else {
        return 1;
    }

    /** configure Job **/
    Job job = new Job(conf, "RollupAvroFiles");
    job.setJarByClass(RollupAvroFiles.class);
    job.setUserClassesTakesPrecedence(true);
    job.setNumReduceTasks(1);

    FileInputFormat.setInputPaths(job, new Path(input));
    job.setInputFormatClass(AvroKeyInputFormat.class);
    AvroJob.setInputKeySchema(job, NetworkRecord.SCHEMA$);


    job.setMapperClass(RollupAvroFilesMapper.class);
    AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.LONG));
    AvroJob.setMapOutputValueSchema(job, NetworkRecord.SCHEMA$);

    job.setMapOutputKeyClass(AvroKey.class);
    job.setMapOutputValueClass(AvroValue.class);

    job.setReducerClass(RollupAvroFilesReducer.class);
    AvroJob.setOutputKeySchema(job, NetworkRecord.SCHEMA$);

    job.setOutputFormatClass(AvroKeyOutputFormat.class);
    FileOutputFormat.setOutputPath(job, new Path(output));

    return job.waitForCompletion(true) ? 0 : 1;
}

public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new RollupAvroFiles(), args);
    System.exit(exitCode);
}

}

Mapper 类内容如下:

public class RollupAvroFilesMapper
        extends Mapper<AvroKey<NetworkRecord>, NullWritable, AvroKey<Long>,
AvroValue<NetworkRecord>> {



    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
    }

    @Override
    protected void map(AvroKey<NetworkRecord> key, NullWritable value, Context context)
            throws IOException, InterruptedException {
NetworkRecord = key.datum();
….
    }
}

任何想法将不胜感激

4

0 回答 0