我的问题是对文件中的值进行排序。键和值是整数,需要维护排序值的键。
key value
1 24
3 4
4 12
5 23
输出:
1 24
5 23
4 12
3 4
我正在处理大量数据,并且必须在一组 hadoop 机器中运行代码。我怎么能用mapreduce做到这一点?
您可能可以这样做(我假设您在这里使用 Java)
从地图发出这样的 -
context.write(24,1);
context.write(4,3);
context.write(12,4)
context.write(23,5)
因此,所有需要排序的值都应该是 mapreduce 作业中的关键。Hadoop 默认按 key 的升序排序。
因此,要么你这样做以降序排序,
job.setSortComparatorClass(LongWritable.DecreasingComparator.class);
或这个,
您需要设置一个自定义降序比较器,它在您的工作中是这样的。
public static class DescendingKeyComparator extends WritableComparator {
protected DescendingKeyComparator() {
super(Text.class, true);
}
@SuppressWarnings("rawtypes")
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
LongWritable key1 = (LongWritable) w1;
LongWritable key2 = (LongWritable) w2;
return -1 * key1.compareTo(key2);
}
}
Hadoop 中的 suffle 和排序阶段将按照降序对键进行排序 24,4,12,23
评论后:
如果您需要降序 IntWritable Comparable,您可以创建一个并像这样使用它 -
job.setSortComparatorClass(DescendingIntComparable.class);
如果您使用的是JobConf,请使用它来设置
jobConfObject.setOutputKeyComparatorClass(DescendingIntComparable.class);
将以下代码放在您的main()
函数下方 -
public static void main(String[] args) {
int exitCode = ToolRunner.run(new YourDriver(), args);
System.exit(exitCode);
}
//this class is defined outside of main not inside
public static class DescendingIntWritableComparable extends IntWritable {
/** A decreasing Comparator optimized for IntWritable. */
public static class DecreasingComparator extends Comparator {
public int compare(WritableComparable a, WritableComparable b) {
return -super.compare(a, b);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
}