0

我的一个映射器生成了一些分布在文件中的日志,如 part-0、part-1、part-2 等。现在每个文件都有一些查询和该查询的一些关联数据:

part-0

q             score         
1 ben 10      4.01
horse shoe    5.96
...

part-1

1 ben 10        3.23
horse shoe      2.98
....

and so on for part-2,3 etc.

现在相同的查询 q 即上面的“1 ben 10”驻留在第 1 部分、第 2 部分等中。

现在我必须编写一个 map reduce 阶段,我可以在其中收集相同的查询并汇总(加起来)他们的分数。

我的 mapper 函数可以是一个身份,在 reduce 中我将完成这项任务。

输出将是:

q       aggScore
1 ben 10    7.24
horse shoe  8.96
...

似乎是一项简单的任务,但我无法想到如何继续进行此操作(阅读很多但实际上无法继续)。我可以考虑通用算法问题,首先我将收集常见查询,然后将它们的分数相加。

任何有关pythonic解决方案或算法(map reduce)提示的帮助将不胜感激。

4

1 回答 1

1

这是 MapReduce 解决方案:

地图输入:每个输入文件(part-0、part-1、part-2、...)都可以输入到单独的(单独的)地图任务。

对于输入文件中的每个输入行,Mapper 发出<q,aggScore>. 如果单个文件中有多个查询的分数,Map 会将它们全部相加,否则如果我们知道每个查询只会在每个文件中出现一次,则 map 可以是<q,aggScore>为每个输入行发出的恒等函数。

Reducer 输入格式<q,list<aggScore1,aggScore2,...>为 Reducer 操作类似于著名的 MapReduce 示例wordcount。如果您使用的是 Hadoop,则可以对 Reducer 使用以下方法。

public void reduce(Text q, Iterable<IntWritable> aggScore, Context context) throws IOException, InterruptedException {
   int sum = 0;
   for (IntWritable val : aggScore) {
      sum += val.get();
   }
   context.write(q, new IntWritable(sum));
}

aggScores该方法将汇总特定的所有内容q并为您提供所需的输出。reducer 的 python 代码应该是这样的(这里q是键,列表aggScores是值):

def reduce(self, key, values, output, reporter):
    sum = 0
    while values.hasNext():
        sum += values.next().get()
    output.collect(key, IntWritable(sum))
于 2013-02-26T07:54:34.900 回答