我写了一个 MapReduce 作业,对数据集进行 ngram 计数。结果是一百个 300MB 格式的文件,<ngram>\t<count>
. 我想将这些组合成一个结果,但我的几次组合尝试都失败了(“任务跟踪器已经消失”)。我的超时时间是 8 小时,而这次崩溃发生在 8.5 小时左右,所以可能是相关的。我有#reducers=5(与# of nodes 相同)。也许我只需要留出更多时间,尽管错误似乎并未表明这一点。我怀疑我的节点正在过载,并且变得无响应。我的理论是我的减速器可以使用一些优化。
我正在使用cat
我的映射器,以及我的减速器的以下 python 脚本:
#!/usr/bin/env python
import sys
counts = {}
for line in sys.stdin:
line = line.strip()
key, count = line.split('\t', 1)
try:
count = int(count)
except ValueError:
continue
if key not in counts:
counts[key] = 0
counts[key] += count
for key in sorted(counts.keys()):
print '%s\t%s'% (key, counts[key])
更新:
正如我在我的一条评论中暗示的那样,我对 Hadoop 自动进行的排序感到困惑。在 Web UI 中,reducer 状态显示了几个不同的阶段,包括“排序”和“减少”。由此我假设Hadoop在将映射器输出发送到reduce之前对其进行排序,但不清楚的是排序是针对发送到reducer的所有数据,还是在减少之前的每个文件。换句话说,我的映射器获取 100 个字段,将其拆分为 400 个输出,每个输出都简单cat
- 将它们发送到减速器,然后减速器(总共 5 个)每个接收这 80 个流。sort 合并所有 80,还是排序 1,减少它;ETC?根据图表,这显然不能指示实际行为,排序过程发生在任何归约之前。如果排序确实对所有输入文件进行了排序,那么我可以简化我的 reducer,使其不存储所有计数的字典,并在键更改后打印出 key-totalCount 对。
关于组合器的使用,我认为这对我来说没有好处,因为我正在减少的数据已经在我尝试组合的 100 个文件中减少了。由于我的#nodes = #reducers (5 & 5),没有什么可以组合reducer 还没有做的。