9

我正在研究类似于规范 MapReduce 示例的东西 - 字数统计,但有一个转折点,我希望只获得前 N个结果。

假设我在 HDFS 中有大量的文本数据。有很多示例展示了如何构建 Hadoop MapReduce 作业,该作业将为您提供该文本中每个单词的字数统计。例如,如果我的语料库是:

“这是对测试数据的测试,也是测试这一点的好方法”

标准 MapReduce 字数统计作业的结果集将是:

test:3, a:2, this:2, is: 1, etc..

但是,如果我只想获得在我的整个数据集中使用的前 3 个单词怎么办?

我仍然可以运行完全相同的标准 MapReduce 字数计数作业,然后在它准备好并吐出每个字的计数后只取前 3 个结果,但这似乎有点低效,因为需要大量数据在洗牌阶段四处移动。

我在想的是,如果这个样本足够大,并且数据在 HDFS 中随机分布良好,那么每个 Mapper 不需要将其所有字数发送到 Reducers,而是只需要顶部数据。所以如果一个映射器有这个:

a:8234, the: 5422, man: 4352, ...... 还有更多的词......,稀有词:1,怪异词:1,等等。

然后我想做的只是将每个 Mapper 的前 100 个左右的单词发送到 Reducer 阶段——因为当一切都说完后,“稀有词”突然进入前 3 名的可能性很小。这似乎可以节省带宽和 Reducer 处理时间。

这可以在组合器阶段完成吗?这种在洗牌阶段之前的优化是否普遍进行?

4

2 回答 2

7

这是一个非常好的问题,因为您遇到了 Hadoop 字数统计示例的低效率问题。

优化问题的技巧如下:

在本地地图阶段进行HashMap基于分组,您也可以为此使用组合器。这看起来像这样,我使用的HashMultiSet是 Guava,它有利于一个很好的计数机制。

    public static class WordFrequencyMapper extends
      Mapper<LongWritable, Text, Text, LongWritable> {

    private final HashMultiset<String> wordCountSet = HashMultiset.create();

    @Override
    protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {

      String[] tokens = value.toString().split("\\s+");
      for (String token : tokens) {
        wordCountSet.add(token);
      }
    }

然后在清理阶段发出结果:

@Override
protected void cleanup(Context context) throws IOException,
    InterruptedException {
  Text key = new Text();
  LongWritable value = new LongWritable();
  for (Entry<String> entry : wordCountSet.entrySet()) {
    key.set(entry.getElement());
    value.set(entry.getCount());
    context.write(key, value);
  }
}

因此,您已将单词分组到本地工作块中,从而通过使用一点 RAM 来减少网络使用。你也可以对 a 做同样的事情Combiner,但它是按组排序的,所以这会比使用 a 慢(尤其是字符串!)HashMultiset

要获得 Top N,您只需将 Top N 写入HashMultiset输出收集器的本地,并在 reduce 端以正常方式聚合结果。这也为您节省了大量的网络带宽,唯一的缺点是您需要在清理方法中对字数元组进行排序。

部分代码可能如下所示:

  Set<String> elementSet = wordCountSet.elementSet();
  String[] array = elementSet.toArray(new String[elementSet.size()]);
  Arrays.sort(array, new Comparator<String>() {

    @Override
    public int compare(String o1, String o2) {
      // sort descending
      return Long.compare(wordCountSet.count(o2), wordCountSet.count(o1));
    }

  });
  Text key = new Text();
  LongWritable value = new LongWritable();
  // just emit the first n records
  for(int i = 0; i < N, i++){
    key.set(array[i]);
    value.set(wordCountSet.count(array[i]));
    context.write(key, value);
  }

希望您了解在本地做尽可能多的单词的要点,然后只聚合前 N 个中的前 N ​​个;)

于 2012-11-28T16:58:53.890 回答
6

引用托马斯

要获得 Top N,您只需将该本地 HashMultiset 中的 Top N 写入输出收集器,并在 reduce 端以正常方式聚合结果。这也为您节省了大量的网络带宽,唯一的缺点是您需要在清理方法中对字数元组进行排序。

如果您只在本地 HashMultiset 中写入前 N 个元素,那么您可能会错过一个元素的计数,如果从这个本地 HashMultiset 传递,它可能成为整体前 10 个元素之一。

例如,考虑以下格式作为 MapName 的三个映射: elementName,elementntcount:

地图A:Ele1,4:Ele2,5:Ele3,5:Ele4,2

地图 B : Ele1,1 : Ele5,7 : Ele6, 3 : Ele7,6

地图 C : Ele5,4 : Ele8,3 : Ele1,1 : Ele9,3

现在,如果我们考虑每个映射器的前 3 个,我们将错过总计数应该为 6 的元素“Ele1”,但由于我们正在计算每个映射器的前 3 个,我们看到“Ele1”的总计数为 4。

我希望这是有道理的。请让我知道您对此有何看法。

于 2014-04-14T05:59:20.540 回答