我在 hadoop 中做了一个练习,用于对对象“IntPair”进行排序,它是 2 个整数的组合。这是输入文件:
2,9
3,8
2,6
3,2
...
“IntPair”类是这样的:
static class IntPair implements WritableComparable<IntPair> {
private int first;
private int second;
...
public int compareTo(IntPair o) {
return (this.first==o.first)?(this.second==o.second?0:(this.second>o.second?1:-1)):(this.first>o.first?1:-1);
}
public static int compare(int a, int b) {
return (a==b)?0:((a>b)?1:-1);
}
...
}
在 Mapper 中,我使用 inputFormat 和 outputKey/Value,并创建 IntPair 实例,每行有 2 个整数:
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String v[] = value.toString().split(",");
IntPair k = new IntPair(Integer.parseInt(v[0]), Integer.parseInt(v[1]));
context.write(k, NullWritable.get());
}
我根据第一个整数对映射器结果进行分区,并根据第一个整数创建组比较器。只有排序比较器基于两个整数。
static class FirstPartitioner extends Partitioner<IntPair, NullWritable> {
public int getPartition(IntPair key, NullWritable value, int numPartitions) {
return Math.abs(key.getFirst()*127)%numPartitions;
}
}
static class BothComparator extends WritableComparator {
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair p1 = (IntPair)w1;
IntPair p2 = (IntPair)w2;
int cmp = IntPair.compare(p1.getFirst(), p2.getFirst());
if(cmp != 0) {
return cmp;
}
return -IntPair.compare(p1.getSecond(), p2.getSecond());//reverse sort
}
}
static class FirstGroupComparator extends WritableComparator {
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair p1 = (IntPair)w1;
IntPair p2 = (IntPair)w2;
return IntPair.compare(p1.getFirst(), p2.getFirst());
}
}
在 Reducer 中,我只是将 IntPair 作为键输出,将 NullWritable 作为值输出:
static class SSReducer extends Reducer<IntPair, NullWritable, IntPair, NullWritable> {
protected void reduce(IntPair key, Iterable<NullWritable> values,
Context context)throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
运行hadoop后,我得到以下结果:
2,9
3,8
早些时候,我认为reducer 应该通过key(IntPair) 对记录进行分组。由于每条记录代表一个不同的键,所以每条记录都会调用一次“reduce”方法,在这种情况下,结果应该是:
2,9
2,6
3,8
3,2
所以我认为存在差异是因为组比较器,因为它只使用第一个整数进行比较。所以在 reducer 中,记录按第一个整数分组。在此示例中,这意味着 2 条记录中的每条记录都调用一次“reduce”,因此在不循环的情况下,它只生成每组的第一条记录。这样对吗?另外,我做了另一个实验,将减速器更改如下:
static class SSReducer extends Reducer<IntPair, NullWritable, IntPair, NullWritable> {
protected void reduce(IntPair key, Iterable<NullWritable> values,
Context context)throws IOException, InterruptedException {
for(NullWritable n : values) //add looping
context.write(key, NullWritable.get());
}
}
然后它产生有 4 个项目的结果。
如果我将 groupcomarator 更改为使用两个整数进行比较,它也会产生 4 个项目。所以reducer实际上使用groupcomparator对keys进行分组,这意味着即使keys不同,一个组中的每条记录都会调用一次'reduce'。