2

我试图产生从映射器生成的每个键值对的概率。

所以,让我们说映射器产生:

a, (r, 5)
a, (e, 6)
a, (w, 7)

我需要添加 5+6+7 = 18 然后找到概率 5/18、6/18、7/18

所以减速器的最终输出看起来像:

a, [[r, 5, 0.278], [e, 6, 0.33], [w, 7, 0.389]]

到目前为止,我只能让 reducer 将值中的所有整数相加。我怎样才能让它返回并将每个实例除以总和?

谢谢!

4

3 回答 3

5

Pai 的解决方案在技术上是正确的,但实际上这会给您带来很多麻烦,因为设置分区可能会很痛苦(请参阅https://groups.google.com/forum/#!topic/mrjob/aV7bNn0sJ2k)。

您可以通过使用 mrjob.step 更轻松地完成此任务,然后创建两个减速器,例如在此示例中:https ://github.com/Yelp/mrjob/blob/master/mrjob/examples/mr_next_word_stats.py

按照您描述的方式进行操作:

from mrjob.job import MRJob
import re
from mrjob.step import MRStep
from collections import defaultdict

wordRe = re.compile(r"[\w]+")

class MRComplaintFrequencyCount(MRJob):

    def mapper(self, _, line):
        self.increment_counter('group','num_mapper_calls',1)

        #Issue is third column in csv
        issue = line.split(",")[3]

        for word in wordRe.findall(issue):
            #Send all map outputs to same reducer
            yield word.lower(), 1

    def reducer(self, key, values):
        self.increment_counter('group','num_reducer_calls',1)  
        wordCounts = defaultdict(int)
        total = 0         
        for value in values:
            word, count = value
            total+=count
            wordCounts[word]+=count

        for k,v in wordCounts.iteritems():
            # word, frequency, relative frequency 
            yield k, (v, float(v)/total)

    def combiner(self, key, values):
        self.increment_counter('group','num_combiner_calls',1) 
        yield None, (key, sum(values))


if __name__ == '__main__':
    MRComplaintFrequencyCount.run()

这会进行标准字数统计并主要在组合器中聚合,然后使用“None”作为公共键,因此每个单词都间接地在同一个键下发送到减速器。在 reducer 中,您可以获得总字数并计算相对频率。

于 2016-09-20T17:08:44.453 回答
4

您在上面所做的也应该有效,但这是假设单个键的所有数据都适合内存。如果是这样,那么在 Reducer 中,您可以将所有值保存在内存中,然后计算您的总数,然后计算每个键值对的边际。这通常称为“条纹”方法。

但是,现在大多数情况下这可能是正确的,并且数据可能不适合内存。在这种情况下,您必须找到一种方法在实际键值对之前发送值来计算您的总数,以便它们可以用于计算边际并立即发出值。

这是“反转顺序”设计模式的候选者。当您需要计算相对频率时,它很有用。基本思想是在 Mapper 端,您为每个中间数据发出 2 个键值对,其中一个键值对对所有值具有相同的公共键。这将用于计算总数。

例子:

For a, (r, 5) :
---------------
emit (a, r), 5
emit (a, *), 5


For a, (e, 6) :
---------------
emit (a, e), 6
emit (a, *), 6


For a, (w, 7) :
---------------
emit (a, w), 7
emit (a, *), 7

完成此操作后,您需要一个分区器,该分区器将仅使用键中的第一个值对每个中间键值对进行分区。在上面的示例中使用“a”。

您还需要一个键排序顺序,始终将具有 * 的键放在键的第二部分。

这样,所有中间键在键的第一部分都有“a”,最终会出现在同一个 reducer 中。此外,它们将以如下所示的方式排序 -

emit (a, *), 5
emit (a, *), 6
emit (a, *), 7
emit (a, e), 6
emit (a, r), 5
emit (a, w), 7

在 reducer 中,当您遍历键值对时,如果键的第二部分有 *,您将不得不简单地从键中累积值。然后,您可以使用累积值来计算所有其他键值对的边际。

total = 0
for(value : values){
    if (key.second == *)
        total += value
    else
        emit (key.first , key.second, value, value/total)
}

这种设计模式通常称为使用对方法的反转顺序。有关此设计模式和其他设计模式的更多信息,我建议阅读本书中有关 MapReduce 设计模式的章节 - http://lintool.github.com/MapReduceAlgorithms/。通过示例很好地解释了它。

于 2013-03-25T02:46:11.077 回答
1

您可以像您一样简单地计算总和,并将这些对保存在内存中,以发出您想要的概率,如下所示:

reduce (key, list<values>):
    int sum = 0;
    for (value in values) {
        sum = sum + value.frequency; //assuming you can extract two fields in each value: value.word and value.frequency
    }
    String outputValue = "[";
    for (value in values) { //iterate over the values once more
        outputValue = outputValue + "["+ value.word + ", " +value.frequency + ", "+ value.frequency/sum +"],"
    }
    outputValue = outputValue.replaceLast(",","]");
    emit (key, outputValue);

当然,这只是一个伪代码,因为我不习惯 python,但我希望过渡应该很容易。

于 2016-09-21T08:04:20.803 回答