10

我的问题是对文件中的值进行排序。键和值是整数,需要维护排序值的键。

key   value
1     24
3     4
4     12
5     23

输出:

1     24
5     23
4     12
3     4

我正在处理大量数据,并且必须在一组 hadoop 机器中运行代码。我怎么能用mapreduce做到这一点?

4

1 回答 1

18

您可能可以这样做(我假设您在这里使用 Java)

从地图发出这样的 -

context.write(24,1);
context.write(4,3);
context.write(12,4)
context.write(23,5)

因此,所有需要排序的值都应该是 mapreduce 作业中的关键。Hadoop 默认按 key 的升序排序。

因此,要么你这样做以降序排序,

job.setSortComparatorClass(LongWritable.DecreasingComparator.class);

或这个,

您需要设置一个自定义降序比较器,它在您的工作中是这样的。

public static class DescendingKeyComparator extends WritableComparator {
    protected DescendingKeyComparator() {
        super(Text.class, true);
    }

    @SuppressWarnings("rawtypes")
    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
        LongWritable key1 = (LongWritable) w1;
        LongWritable key2 = (LongWritable) w2;          
        return -1 * key1.compareTo(key2);
    }
}

Hadoop 中的 suffle 和排序阶段将按照降序对键进行排序 24,4,12,23

评论后:

如果您需要降序 IntWritable Comparable,您可以创建一个并像这样使用它 -

job.setSortComparatorClass(DescendingIntComparable.class);

如果您使用的是JobConf,请使用它来设置

jobConfObject.setOutputKeyComparatorClass(DescendingIntComparable.class);

将以下代码放在您的main()函数下方 -

public static void main(String[] args) {
    int exitCode = ToolRunner.run(new YourDriver(), args);
    System.exit(exitCode);
}

//this class is defined outside of main not inside
public static class DescendingIntWritableComparable extends IntWritable {
    /** A decreasing Comparator optimized for IntWritable. */ 
    public static class DecreasingComparator extends Comparator {
        public int compare(WritableComparable a, WritableComparable b) {
            return -super.compare(a, b);
        }
        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return -super.compare(b1, s1, l1, b2, s2, l2);
        }
    }
}
于 2013-08-09T22:11:43.457 回答