这是一个非常好的问题,因为您遇到了 Hadoop 字数统计示例的低效率问题。
优化问题的技巧如下:
在本地地图阶段进行HashMap
基于分组,您也可以为此使用组合器。这看起来像这样,我使用的HashMultiSet
是 Guava,它有利于一个很好的计数机制。
public static class WordFrequencyMapper extends
Mapper<LongWritable, Text, Text, LongWritable> {
private final HashMultiset<String> wordCountSet = HashMultiset.create();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] tokens = value.toString().split("\\s+");
for (String token : tokens) {
wordCountSet.add(token);
}
}
然后在清理阶段发出结果:
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
Text key = new Text();
LongWritable value = new LongWritable();
for (Entry<String> entry : wordCountSet.entrySet()) {
key.set(entry.getElement());
value.set(entry.getCount());
context.write(key, value);
}
}
因此,您已将单词分组到本地工作块中,从而通过使用一点 RAM 来减少网络使用。你也可以对 a 做同样的事情Combiner
,但它是按组排序的,所以这会比使用 a 慢(尤其是字符串!)HashMultiset
。
要获得 Top N,您只需将 Top N 写入HashMultiset
输出收集器的本地,并在 reduce 端以正常方式聚合结果。这也为您节省了大量的网络带宽,唯一的缺点是您需要在清理方法中对字数元组进行排序。
部分代码可能如下所示:
Set<String> elementSet = wordCountSet.elementSet();
String[] array = elementSet.toArray(new String[elementSet.size()]);
Arrays.sort(array, new Comparator<String>() {
@Override
public int compare(String o1, String o2) {
// sort descending
return Long.compare(wordCountSet.count(o2), wordCountSet.count(o1));
}
});
Text key = new Text();
LongWritable value = new LongWritable();
// just emit the first n records
for(int i = 0; i < N, i++){
key.set(array[i]);
value.set(wordCountSet.count(array[i]));
context.write(key, value);
}
希望您了解在本地做尽可能多的单词的要点,然后只聚合前 N 个中的前 N 个;)