0

我正在尝试理解 Python 中的 Hadoop 字数统计示例 http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/

作者从朴素版本的 mapper 和 reducer 开始。这是减速器(为简洁起见,我删除了一些评论)

#!/usr/bin/env python

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    line = line.strip()

    word, count = line.split('\t', 1)

    try:
        count = int(count)
    except ValueError:
        continue

    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)

作者使用以下方法测试程序:

echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py

所以reducer的写法就好像reducer作业的输入数据是这样的:

aa 1
aa 1
bb 1
cc 1
cc 1
cc 1

我最初对 reducer 的理解是,给定 reducer 的输入数据将包含一个唯一键。所以在前面的例子中,需要 3 个 reducers 作业。我的理解不正确吗?

然后作者介绍了mapper和reducer的改进版本。这是减速器:

#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""

from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator='\t'):
    # input comes from STDIN (standard input)
    data = read_mapper_output(sys.stdin, separator=separator)

    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except ValueError:
            # count was not a number, so silently discard this item
            pass

if __name__ == "__main__":
    main()

作者添加了以下警告:

注意:以下 Map 和 Reduce 脚本只有在 Hadoop 上下文中运行时才能“正确”工作,即作为 MapReduce 作业中的 Mapper 和 Reducer。这意味着运行简单的测试命令“cat DATA | ./mapper.py | 排序-k1,1 | ./reducer.py”将不再正常工作,因为某些功能被有意外包给 Hadoop。

我不明白为什么天真的测试命令不适用于新版本。我认为使用sort -k1,1会为减速器的两个版本产生相同的输入。我错过了什么?

4

1 回答 1

0

关于您的第一个问题:“我对减速器的最初理解是,给定减速器的输入数据将包含一个唯一键。因此在前面的示例中,需要 3 个减速器作业。我的理解不正确吗?”

MapReduce 抽象与 Hadoop 对该抽象的实现之间存在差异。在抽象中,reducer 与唯一键相关联。另一方面,Hadoop 实现将多个键分配给同一个 reducer(以避免关闭进程和启动新进程的成本)。特别是,在 Hadoop 流中,reducer 接收与一定数量的键(可能是零、一个或多个键)对应的键值对,但您可以保证与某个键关联的键值对将彼此接连而来。

例如,让我们以输入“foo foo quux labs foo bar quux”为例进行字数统计。然后可能是一个reducer 接收输入“bar 1\nfoo 1\nfoo 1\nfoo1”,另一个reducer 接收“labs 1\nquux 1\nquux 1”。实际运行的 reducer 进程的数量由您使用选项 mapred.reduce.tasks 决定。例如要使用 2 个减速器,您可以这样做

 $ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -D mapred.reduce.tasks=2 -mapper ....

关于你的第二个问题,我同意你的观点sort -k1,1,所以我也没有看到问题。

于 2013-08-18T05:31:02.427 回答