4

我正在尝试更好地理解 mrjob 的示例

from mrjob.job import MRJob  
class MRWordFrequencyCount(MRJob):

    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    def reducer(self, key, values):
        yield key, sum(values)
if __name__ == '__main__':
    MRWordFrequencyCount.run()

我运行它

$ python word_count.py my_file.txt

它按预期工作,但我不明白它是如何自动知道它将读取一个文本文件并按每一行分割的。而且我也不确定它的_作用。

据我了解,mapper()为每一行生成三个键/值对是否正确?如果我想处理文件夹中的每个文件怎么办?

并且reducer()自动知道如何添加每个键的值?

如果我想通过 map reduce 运行单元测试,mapper 和 reducer 会是什么样子?甚至有必要吗?

4

3 回答 3

4

mapper 方法接收一个已经从输入文本中解析出来的键值对。mrjob 使用 Hadoop 流式传输,每个输入文本由换行符分割,然后根据使用的输入协议将每行拆分为键值对。这是框架为您处理的事情,因此您不必做任何繁重的工作;您可以假设您将获得正确的键和值。

但是,您确实需要指定指定的输入文本文件类型。例如,如果键和/或值不是纯文本(如在原始问题中)而是序列化的 JSON,那么您使用 JSONProtocol/JSONValueProtocol 等,而不是默认的 RawValueProtocol。

对于初始映射器,每一行都被读入值(通过 RawValueProtocol),这就是你没有收到密钥的原因。Using_只是一个未使用的虚拟变量的 Python 约定。(然而,_它实际上是 Python 变量的有效名称。你可以做这样的事情a = 3; _ = 2; b = a + _。亵渎神明,不是吗?)

mrjob 可以接受多个输入文件。你可以做例如

$ python wordcount.py text1.txt text2.txt

如果您希望将所有文本文件作为 mrjob 作业的输入,您可以执行以下操作

$ python wordcount.py inputdir/*.txt

或者只是简单地

$ python wordcount.py inputdir

并且所有选择的文件都用作输入。

reducer 接收的是一个键和与该键关联的所有值的迭代器。因此,如果您举例,valuesreducer 方法中的变量是一个迭代器。如果你想对所有值做某事,你需要对所有值进行实际迭代。在问题的具体示例中,内置函数sum可以将迭代器作为参数,这就是为什么您可以一次性完成的原因。但它实际上类似于sum([value for value in values]).

我实际上不知道您将如何对 mrjob 脚本进行单元测试。我通常只是在生产运行之前对一小部分测试数据进行了测试。

于 2014-04-22T06:47:01.313 回答
2

我对mrjob了解不多,所以我将做一些假设。首先,_ 表示忽略密钥(经过谷歌搜索验证)。其次,我认为它可以在逗号分隔的文件列表或目录上工作。接下来,此代码没有设置,可能是因为这些是默认方法名称。我敢肯定,如果您将映射器或减速器命名为其他 mrjob 无法自动获取的名称。

我在这里找到了一些例子。

于 2014-04-22T04:19:39.020 回答
0
from mrjob.job import MRJob

class MRRatingCounter(MRJob):
    def mapper(self, key, line):


        (userID, movieID, rating, timestamp) = line.split('\t')
        yield rating, 1

    def reducer(self, rating, occurences):
        yield rating, sum(occurences)

if __name__ == '__main__':
    MRRatingCounter.run()

所以请根据上面的讨论纠正我:

如果我错了,请纠正我,所以在这种情况下,关键是取输入文件的值,在这种情况下,我可以像这样在我的脑海中阅读它:

def mapper ( self{ 这是对象/实例} , key{ 输入文本文件在这种情况下是文件名并更正我 ml-100k/u.data , line{ 这是正在尝试传递给映射器() 每次都来自数据文件 ., )

我正在尝试学习的 Udemy 的另一个代码是原始问题所问的:class MRFriendsByAge(MRJob):

def mapper(self, _, line):
    (ID, name, age, numFriends) = line.split(',')
    yield age, float(numFriends)

def reducer(self, age, numFriends):
    total = 0
    numElements = 0
    for x in numFriends:
        total += x
        numElements += 1

    yield age, total / numElements

如果名称== '主要': MRFriendsByAge.run()

找到一本 MR 工作簿,并试图看看它是否有意义,但我仍在苦苦挣扎

于 2018-01-11T04:33:04.803 回答