1

我的输出是:

word , file ----- ------ wordx Doc2, Doc1, Doc1, Doc1, Doc1, Doc1, Doc1, Doc1

我想要的是:

word , file ----- ------ wordx Doc2, Doc1

public static class LineIndexMapper extends MapReduceBase
        implements Mapper<LongWritable, Text, Text, Text> {

    private final static Text word = new Text();
    private final static Text location = new Text();

    public void map(LongWritable key, Text val,
            OutputCollector<Text, Text> output, Reporter reporter)
            throws IOException {
        FileSplit fileSplit = (FileSplit) reporter.getInputSplit();
        String fileName = fileSplit.getPath().getName();
        location.set(fileName);

        String line = val.toString();
        StringTokenizer itr = new StringTokenizer(line.toLowerCase());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            output.collect(word, location);
        }
    }
}

public static class LineIndexReducer extends MapReduceBase
        implements Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterator<Text> values,
            OutputCollector<Text, Text> output, Reporter reporter)
            throws IOException {

        boolean first = true;
        StringBuilder toReturn = new StringBuilder();
        while (values.hasNext()) {
            if (!first) {
                toReturn.append(", ");
            }
            first = false;
            toReturn.append(values.next().toString());
        }

        output.collect(key, new Text(toReturn.toString()));
    }
}

为了获得最佳性能 - 我应该在哪里跳过重复出现的文件名?映射,减少或两者兼而有之?ps:我是编写MR任务的初学者,也试图用我的问题弄清楚编程逻辑。

4

2 回答 2

1

您将只能删除 Reducer 中的重复项。为此,您可以使用不允许重复的 Set。

public void reduce(Text key, Iterator<Text> values,
        OutputCollector<Text, Text> output, Reporter reporter)
        throws IOException {

    // Text's equals() method should be overloaded to make this work
    Set<Text> outputValues = new HashSet<Text>();

    while (values.hasNext()) {
      // make a new Object because Hadoop may mess with original
      Text value = new Text(values.next());

      // takes care of removing duplicates
      outputValues.add(value);
    }

    boolean first = true;
    StringBuilder toReturn = new StringBuilder();
    Iterator<Text> outputIter = outputValues.iter();
    while (outputIter.hasNext()) {
        if (!first) {
            toReturn.append(", ");
        }
        first = false;
        toReturn.append(outputIter.next().toString());
    }

    output.collect(key, new Text(toReturn.toString()));
}

编辑:根据克里斯的评论将值的副本添加到设置。

于 2012-04-24T20:34:25.703 回答
0

您可以通过进行本地地图聚合和引入组合器来提高性能 - 基本上您希望减少在映射器和化简器之间传输的数据量

本地地图聚合是一个概念,您可以在其中维护类似 LRU 的输出对的地图(或集合)。在您的情况下,当前映射器文档的一组单词(假设每个地图都有一个文档)。这样,您可以在集合中查找单词,并且仅在集合尚未包含该单词时才输出 K,V 对(表明您尚未为其输出条目)。如果集合不包含单词,则输出单词,docid pair,并用单词更新集合。

如果集合太大(比如 5000 或 10000 个条目),则将其清除并重新开始。这样,您将显着地看到映射器输出的值的数量(如果您的值域或值集很小,那么单词就是一个很好的例子)。

您也可以在组合器阶段引入您的减速器逻辑

一旦警告的最后一句话 - 在将 Key / Value 对象添加到集合中时要小心(就像在 Matt D 的回答中一样),hadoop 在引擎盖下重新使用对象,所以如果你得到意想不到的结果,请不要感到惊讶引用 - 始终创建对象的副本。

有一篇关于本地地图聚合的文章(用于字数统计示例),您可能会发现它很有用:

于 2012-04-24T21:07:42.270 回答