0

我刚刚开始使用 mrJob(python 的 mapReduce)并且是 MapReduce 范例的新手,我想了解有关 MRJob 文档站点上的 word_count.py 教程的以下信息。

文档说,如果我们创建一个 word_count.py 并使用一些文本文件运行它,它将计算并返回文本文件中的行数、字符数和单词数。这是他们用于 word_count.py 的代码:

from mrjob.job import MRJob


class MRWordFrequencyCount(MRJob):

    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    def reducer(self, key, values):
        yield key, sum(values)


if __name__ == '__main__':
    MRWordFrequencyCount.run()

这里我了解到我们扩展了 MRJob 类并覆盖了 mapper 和 reducer 方法。但是我没有得到的是在执行过程中,我们通过传递整个文本文件来执行,如下所示:

python  word_count.py  entire_text_file.txt

那么映射器如何知道如何一次解析一行呢?基本上我的问题是在这种情况下,上面定义的 mapper() 函数的输入是什么?它是整个文件的内容还是一次一行。如果是单行,MRJob 代码的哪一部分负责一次向 mapper() 函数提供单行。希望我已经让我原本含糊不清的问题变得不那么含糊了,但这让我完全被难住了。任何帮助,将不胜感激。

提前致谢!

4

1 回答 1

0

好吧,我想最好的答案是 RTFC :P

如果您查看 /usr/lib/python2.6/site-packages/mrjob/job.py (假设您在 python2.6 上安装了带有 pip 的 mrjob),您会发现它是如何从输入中准确读取行并运行 mapper for每一行

def run_mapper(self, step_num=0):
    ...

    # pick input and output protocol
    read_lines, write_line = self._wrap_protocols(step_num, 'mapper')

    if mapper_init:
        for out_key, out_value in mapper_init() or ():
            write_line(out_key, out_value)

    # run the mapper on each line
    for key, value in read_lines():
        for out_key, out_value in mapper(key, value) or ():
            write_line(out_key, out_value)

    if mapper_final:
        for out_key, out_value in mapper_final() or ():
            write_line(out_key, out_value)

这里是 read_lines 的定义

def _wrap_protocols(self, step_num, step_type):
    """Pick the protocol classes to use for reading and writing
    for the given step, and wrap them so that bad input and output
    trigger a counter rather than an exception unless --strict-protocols
    is set.

    Returns a tuple of ``(read_lines, write_line)``

    ``read_lines()`` is a function that reads lines from input, decodes
        them, and yields key, value pairs.
    ``write_line()`` is a function that takes key and value as args,
        encodes them, and writes a line to output.

    :param step_num: which step to run (e.g. 0)
    :param step_type: ``'mapper'``, ``'reducer'``, or ``'combiner'`` from
                      :py:mod:`mrjob.step`
    """
    read, write = self.pick_protocols(step_num, step_type)

    def read_lines():
        for line in self._read_input():
            try:
                key, value = read(line.rstrip('\r\n'))
                yield key, value
            except Exception, e:
                if self.options.strict_protocols:
                    raise
                else:
                    self.increment_counter(
                        'Undecodable input', e.__class__.__name__)

    def write_line(key, value):
        try:
            print >> self.stdout, write(key, value)
        except Exception, e:
            if self.options.strict_protocols:
                raise
            else:
                self.increment_counter(
                    'Unencodable output', e.__class__.__name__)

    return read_lines, write_line

最终,您可以阅读 /usr/lib/python2.6/site-packages/mrjob/util.py 中的 read_input 和 read_file 方法。希望它有帮助

于 2013-11-14T03:40:51.290 回答