我正在尝试理解 Python 中的 Hadoop 字数统计示例 http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
作者从朴素版本的 mapper 和 reducer 开始。这是减速器(为简洁起见,我删除了一些评论)
#!/usr/bin/env python
from operator import itemgetter
import sys
current_word = None
current_count = 0
word = None
# input comes from STDIN
for line in sys.stdin:
line = line.strip()
word, count = line.split('\t', 1)
try:
count = int(count)
except ValueError:
continue
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
print '%s\t%s' % (current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
print '%s\t%s' % (current_word, current_count)
作者使用以下方法测试程序:
echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py
所以reducer的写法就好像reducer作业的输入数据是这样的:
aa 1
aa 1
bb 1
cc 1
cc 1
cc 1
我最初对 reducer 的理解是,给定 reducer 的输入数据将包含一个唯一键。所以在前面的例子中,需要 3 个 reducers 作业。我的理解不正确吗?
然后作者介绍了mapper和reducer的改进版本。这是减速器:
#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 1)
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_mapper_output(sys.stdin, separator=separator)
for current_word, group in groupby(data, itemgetter(0)):
try:
total_count = sum(int(count) for current_word, count in group)
print "%s%s%d" % (current_word, separator, total_count)
except ValueError:
# count was not a number, so silently discard this item
pass
if __name__ == "__main__":
main()
作者添加了以下警告:
注意:以下 Map 和 Reduce 脚本只有在 Hadoop 上下文中运行时才能“正确”工作,即作为 MapReduce 作业中的 Mapper 和 Reducer。这意味着运行简单的测试命令“cat DATA | ./mapper.py | 排序-k1,1 | ./reducer.py”将不再正常工作,因为某些功能被有意外包给 Hadoop。
我不明白为什么天真的测试命令不适用于新版本。我认为使用sort -k1,1
会为减速器的两个版本产生相同的输入。我错过了什么?