4

All of my programs are writing with hadoop's new MR1 interfaces (org.apache.hadoop.mapreduce), so I want to use new org.apache.avro.mapreduce of avro too. But it doesn't work for me.

The program takes input of avro data and output the same. The main idea behind my program is subclassing hadoop's Mapper and Reducer against avro wrapped key/value. Here is a block of my job driver :

    AvroJob.setInputKeySchema(job, NetflowRecord.getClassSchema());
    AvroJob.setOutputKeySchema(job, NetflowRecord.getClassSchema());

    job.setMapperClass(MyAvroMap.class);
    job.setReducerClass(MyAvroReduce.class);

    job.setInputFormatClass(AvroKeyInputFormat.class);
    job.setOutputFormatClass(AvroKeyOutputFormat.class);

    job.setMapOutputKeyClass(AvroKey.class);
    job.setMapOutputValueClass(AvroValue.class);

    job.setOutputKeyClass(AvroKey.class);
    job.setOutputValueClass(NullWritable.class);

The definitions of MyAvroMap and MyAvroReduce subclasses respectivly are

public static class MyAvroMap extends Mapper<AvroKey<NetflowRecord>, NullWritable,
            AvroKey<CharSequence>, AvroValue<NetflowRecord>>{ ... }

public static class MyAvroReduce extends Reducer<AvroKey<CharSequence>, AvroValue<NetflowRecord>, 
                AvroKey<NetflowRecord>, NullWritable>{ ... }

The methioned NetflowRecord is my avro record class. And I got running exception

java.lang.ClassCastException: class org.apache.avro.hadoop.io.AvroKey

By reading hadoop's and avro's source code, I found that the exception was thrown by JobConf to make sure the map key is a subclass of WritableComparable, like this (hadoop1.2.1, line759)

WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));

But the avro shows that AvroKey and AvroValue are just a simple wrapper without subclassing Writable* interfaces of hadoop.

I believe that, even without testing, I can get through using old mapred interfaces, but its not what I want. Can u give me some examples or explaination about programming with pure org.apache.avro.mapreduce interfaces??

Sincerely,

Jamin

4

1 回答 1

5

经过艰苦的搜索,在这个补丁https://issues.apache.org/jira/browse/AVRO-593的帮助下,我发现每个 AvroKey 和 AvroValue 包装器都必须在作业配置中有一个模式定义。这就是我错过的。

在这里,我有两个选择:

  1. 如果保持 MyAvroMap 和 MyAvroReduce 不变,我必须为 CharSequence 定义一个模式,并使用 AvroJob 为 Mapper 输出声明此模式,例如

    AvroJob.setMapOutputKeySchema(job, <"defined-schema-for-charsequence">); AvroJob.setMapOutputValueSchema(job, NetflowRecord.getClassSchema());

  2. 通过将 Mapper 输出键/值更改为 Text/AvroValue,我只需要为 Mapper 输出值添加模式声明,例如

    job.setMapOutputKeyClass(Text.class); AvroJob.setMapOutputValueSchema(job, NetflowRecord.getClassSchema());

使用 mapreduce API,我们不再需要继承 AvroMapper 和 AvroReducer。在这里,我在我的代码中实现了 option2 而没有额外的模式定义。

贾敏

于 2013-10-02T17:12:56.703 回答