0

我正在编写一个自定义key类,没有hashCode实现。

我运行一个map-reduce作业,但是在作业配置过程中,我设置了partitoner类:比如

        Job job = Job.getInstance(config);
        job.setJarByClass(ReduceSideJoinDriver.class);

        FileInputFormat.addInputPaths(job, filePaths.toString());
        FileOutputFormat.setOutputPath(job, new Path(args[args.length-1]));

        job.setMapperClass(JoiningMapper.class);
        job.setReducerClass(JoiningReducer.class);
        job.setPartitionerClass(TaggedJoiningPartitioner.class); -- Here is the partitioner set
        job.setGroupingComparatorClass(TaggedJoiningGroupingComparator.class);
        job.setOutputKeyClass(TaggedKey.class);
        job.setOutputValueClass(Text.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);

这是partitioner实现:

public class TaggedJoiningPartitioner extends Partitioner<TaggedKey,Text> {

    @Override
    public int getPartition(TaggedKey taggedKey, Text text, int numPartitions) {
        return Math.abs(taggedKey.getJoinKey().hashCode()) % numPartitions;
    }
}

我运行map-reduce作业并保存输出。

job.setPartitionerClass(TaggedJoiningPartitioner.class);现在我在上面的工作设置中注释掉了。

hashCode()在我的自定义类中实现如下:

public class TaggedKey implements Writable, WritableComparable<TaggedKey> {

    private Text joinKey = new Text();
    private IntWritable tag = new IntWritable();

    @Override
    public int compareTo(TaggedKey taggedKey) {
        int compareValue = this.joinKey.compareTo(taggedKey.getJoinKey());
        if(compareValue == 0 ){
            compareValue = this.tag.compareTo(taggedKey.getTag());
        }
       return compareValue;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        joinKey.write(out);
        tag.write(out);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        joinKey.readFields(in);
        tag.readFields(in);
    }

    @Override
    public int hashCode(){
        return joinKey.hashCode();
    }

    @Override
    public boolean equals(Object o){
        if (this==o)
            return true;
        if (!(o instanceof TaggedKey)){
            return false;
        }
        TaggedKey that=(TaggedKey)o;
        return this.joinKey.equals(that.joinKey);
    }
}

现在我再次运行该作业(注意:我没有任何partitoner设置)。在 map-reduce 工作之后,我比较了前一个的输出。它们完全相同。

所以我的问题是:

   1)  Is this behavior universal, that is always reproducible in any
        custom implementations? 

    2) Does implementing hashcode on my key class is same as doing a
    job.setPartitionerClass.

    3) If they both serve same purpose, what is the need for
    setPartitonerClass?

    4) if both hashcode() implementation and Partitonerclass
    implementation are conflicting, which one will take precedence?
4

1 回答 1

0

您会得到相同的结果,因为您的自定义分区程序正在执行默认分区程序所做的事情。您只是将代码移动到另一个类并在那里执行它。放入不同的逻辑,如 key().toString().length() % numPartitions 或获取 hashcode() % numPartitions 以外的其他逻辑,您将看到减速器的键分布不同。

例如,您不能仅通过编辑 hashcode() 来获取此分区器

公共静态类 MyPartitioner 扩展 Partitioner {

    @Override
    public int getPartition(Text key, Text value, int numReduceTasks) {

        int len = key.value().length;

        if(numReduceTasks == 0)
            return 0;

        if(len <=numReduceTasks/3){               
            return 0;
        }
        if(len >numReduceTasks/3 && len <=numReduceTasks/2){

            return 1 % numReduceTasks;
        }
        else
            return len % numReduceTasks;
    }
}
于 2014-07-18T01:41:40.773 回答