3

我在 hadoop 中做了一个练习,用于对对象“IntPair”进行排序,它是 2 个整数的组合。这是输入文件:

2,9
3,8
2,6
3,2
...

“IntPair”类是这样的:

static class IntPair implements WritableComparable<IntPair> {
    private int first;
    private int second;   
       ...
   public int compareTo(IntPair o) {
       return (this.first==o.first)?(this.second==o.second?0:(this.second>o.second?1:-1)):(this.first>o.first?1:-1);
    }
   public static int compare(int a, int b) {
   return (a==b)?0:((a>b)?1:-1);
   }
       ...  
}

在 Mapper 中,我使用 inputFormat 和 outputKey/Value,并创建 IntPair 实例,每行有 2 个整数:

protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
            String v[] = value.toString().split(",");
            IntPair k = new IntPair(Integer.parseInt(v[0]), Integer.parseInt(v[1]));
            context.write(k, NullWritable.get());

        }

我根据第一个整数对映射器结果进行分区,并根据第一个整数创建组比较器。只有排序比较器基于两个整数。

static class FirstPartitioner extends Partitioner<IntPair, NullWritable> {

    public int getPartition(IntPair key, NullWritable value, int numPartitions) {
            return Math.abs(key.getFirst()*127)%numPartitions;
        }
}
static class BothComparator extends WritableComparator {
    public int compare(WritableComparable w1, WritableComparable w2) {
            IntPair p1 = (IntPair)w1;
            IntPair p2 = (IntPair)w2;
            int cmp = IntPair.compare(p1.getFirst(), p2.getFirst());
            if(cmp != 0) {
                return cmp;
            }
            return -IntPair.compare(p1.getSecond(), p2.getSecond());//reverse sort
    }

}

static class FirstGroupComparator extends WritableComparator {
    public int compare(WritableComparable w1, WritableComparable w2) {
            IntPair p1 = (IntPair)w1;
            IntPair p2 = (IntPair)w2;
            return IntPair.compare(p1.getFirst(), p2.getFirst());
    }
}

在 Reducer 中,我只是将 IntPair 作为键输出,将 NullWritable 作为值输出:

static class SSReducer extends Reducer<IntPair, NullWritable, IntPair, NullWritable> {
        protected void reduce(IntPair key, Iterable<NullWritable> values,
            Context context)throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
}

运行hadoop后,我得到以下结果:

   2,9
   3,8

早些时候,我认为reducer 应该通过key(IntPair) 对记录进行分组。由于每条记录代表一个不同的键,所以每条记录都会调用一次“reduce”方法,在这种情况下,结果应该是:

2,9
2,6
3,8
3,2

所以我认为存在差异是因为组比较器,因为它只使用第一个整数进行比较。所以在 reducer 中,记录按第一个整数分组。在此示例中,这意味着 2 条记录中的每条记录都调用一次“reduce”,因此在不循环的情况下,它只生成每组的第一条记录。这样对吗?另外,我做了另一个实验,将减速器更改如下:

static class SSReducer extends Reducer<IntPair, NullWritable, IntPair, NullWritable> {
     protected void reduce(IntPair key, Iterable<NullWritable> values,
                Context context)throws IOException, InterruptedException {
                        for(NullWritable n : values) //add looping
                   context.write(key, NullWritable.get());
            }
    }

然后它产生有 4 个项目的结果。

如果我将 groupcomarator 更改为使用两个整数进行比较,它也会产生 4 个项目。所以reducer实际上使用groupcomparator对keys进行分组,这意味着即使keys不同,一个组中的每条记录都会调用一次'reduce'。

4

2 回答 2

3

是的,即使键不同,一组中的每条记录也会调用一次“减少”。实际上,每个组调用reduce方法一次,组中的第一个键为'KEY',组中的所有值形成reduce方法的值。

即使我们在 reduce 方法中只有一个键(第一个键)并且所有值都是可迭代的,但您可以看到,在迭代时,我们将获得可迭代内部值的对应键。

首先,我们使用两个键进入 groupcomparator,reduce 方法启动并从迭代器内部再次调用具有另外两个键的组比较器。

这意味着reducer事先不知道它的可迭代值。它是在迭代可迭代值时确定的。

因此,如果我们不迭代值,我们只会看到组的第一个键。如果我们迭代值,我们将获得所有键。

于 2013-04-05T12:04:25.137 回答
0

你的理解是正确的。键的“复合值”对进入 reducer 的分组没有影响。比较器的特定行为和它们所查看的特定字段使人受到尊重..

于 2012-10-17T11:26:01.157 回答