我在 python 中的单个 hadoop 伪分布式节点上运行流式 hadoop 作业,还使用 hadoop-lzo 在 .lzo 压缩输入文件上生成拆分。
使用小型压缩或未压缩测试数据集时,一切都按预期工作;MapReduce 输出与简单的 'cat | 地图 | 排序 | 减少'Unix中的管道。- 输入是否被压缩。
但是,一旦我开始处理单个大型 .lzo(预索引)数据集(压缩约 40GB)并将作业拆分为多个映射器,输出看起来会被截断 - 只有前几个键值存在。
代码 + 输出如下 - 如您所见,这是测试整个过程的非常简单的计数。
来自测试数据(大型数据集的子集)的直接 unix 管道的输出;
lzop -cd objectdata_input.lzo | ./objectdata_map.py | sort | ./objectdata_red.py
3656 3
3671 3
51 6
hadoop 作业对测试数据的输出(与上述相同的测试数据)
hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-streaming-*.jar -input objectdata_input.lzo -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat -output retention_counts -mapper objectdata_map.py -reducer objectdata_red.py -file /home/bob/python-dev/objectdata_map.py -file /home/bob/python-dev/objectdata_red.py
3656 3
3671 3
51 6
现在,测试数据是来自真实数据集的一小部分行,所以我至少希望在针对完整数据集运行作业时在结果输出中看到上面的键。但是,我得到的是;
hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-streaming-*.jar -input objectdata_input_full.lzo -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat -output retention_counts -mapper objectdata_map.py -reducer objectdata_red.py -file /home/bob/python-dev/objectdata_map.py -file /home/bob/python-dev/objectdata_red.py
1 40475582
12 48874
14 8929777
15 219984
16 161340
17 793211
18 78862
19 47561
2 14279960
20 56399
21 3360
22 944639
23 1384073
24 956886
25 9667
26 51542
27 2796
28 336767
29 840
3 3874316
30 1776
33 1448
34 12144
35 1872
36 1919
37 2035
38 291
39 422
4 539750
40 1820
41 1627
42 97678
43 67581
44 11009
45 938
46 849
47 375
48 876
49 671
5 262848
50 5674
51 90
6 6459687
7 4711612
8 20505097
9 135592
...根据数据集,键的数量比我预期的要少得多。
我不太担心密钥本身 - 考虑到输入数据集,可以预期这个集合,我更担心应该有更多的密钥,以千计。当我在 unix 管道中针对数据集中的前 2500 万条记录运行代码时,我得到的密钥范围约为 1 - 7000。
所以,这个输出似乎只是我实际期望的前几行,我不知道为什么。我错过了整理许多 part-0000# 文件吗?或类似的东西?这只是我在家里测试的一个单节点伪分布式hadoop,所以如果有更多的part-#文件要收集,我不知道它们可能在哪里;它们不会出现在 HDFS 的保留计数目录中。
mapper 和 reducer 代码如下 - 与许多字数示例一样有效;
objectdata_map.py
#!/usr/bin/env python
import sys
RETENTION_DAYS=(8321, 8335)
for line in sys.stdin:
line=line.strip()
try:
retention_days=int(line[RETENTION_DAYS[0]:RETENTION_DAYS[1]])
print "%s\t%s" % (retention_days,1)
except:
continue
objectdata_red.py
#!/usr/bin/env python
import sys
last_key=None
key_count=0
for line in sys.stdin:
key=line.split('\t')[0]
if last_key and last_key!=key:
print "%s\t%s" % (last_key,key_count)
key_count=1
else:
key_count+=1
last_key=key
print "%s\t%s" % (last_key,key_count)
这一切都在手动安装的 hadoop 1.1.2 上,伪分布式模式,从构建和安装 hadoop-lzo