0

我在 python 中的单个 hadoop 伪分布式节点上运行流式 hadoop 作业,还使用 ​​hadoop-lzo 在 .lzo 压缩输入文件上生成拆分。

使用小型压缩或未压缩测试数据集时,一切都按预期工作;MapReduce 输出与简单的 'cat | 地图 | 排序 | 减少'Unix中的管道。- 输入是否被压缩。

但是,一旦我开始处理单个大型 .lzo(预索引)数据集(压缩约 40GB)并将作业拆分为多个映射器,输出看起来会被截断 - 只有前几个键值存在。

代码 + 输出如下 - 如您所见,这是测试整个过程的非常简单的计数。

来自测试数据(大型数据集的子集)的直接 unix 管道的输出;

lzop -cd objectdata_input.lzo | ./objectdata_map.py | sort | ./objectdata_red.py

3656  3
3671  3
51    6

hadoop 作业对测试数据的输出(与上述相同的测试数据)

hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-streaming-*.jar -input objectdata_input.lzo -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat -output retention_counts -mapper objectdata_map.py -reducer objectdata_red.py -file /home/bob/python-dev/objectdata_map.py -file /home/bob/python-dev/objectdata_red.py

3656  3
3671  3
51    6

现在,测试数据是来自真实数据集的一小部分行,所以我至少希望在针对完整数据集运行作业时在结果输出中看到上面的键。但是,我得到的是;

hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-streaming-*.jar -input objectdata_input_full.lzo -inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat -output retention_counts -mapper objectdata_map.py -reducer objectdata_red.py -file /home/bob/python-dev/objectdata_map.py -file /home/bob/python-dev/objectdata_red.py

1       40475582
12      48874
14      8929777
15      219984
16      161340
17      793211
18      78862
19      47561
2       14279960
20      56399
21      3360
22      944639
23      1384073
24      956886
25      9667
26      51542
27      2796
28      336767
29      840
3       3874316
30      1776
33      1448
34      12144
35      1872
36      1919
37      2035
38      291
39      422
4       539750
40      1820
41      1627
42      97678
43      67581
44      11009
45      938
46      849
47      375
48      876
49      671
5       262848
50      5674
51      90
6       6459687
7       4711612
8       20505097
9       135592

...根据数据集,键的数量比我预期的要少得多。

我不太担心密钥本身 - 考虑到输入数据集,可以预期这个集合,我更担心应该有更多的密钥,以千计。当我在 unix 管道中针对数据集中的前 2500 万条记录运行代码时,我得到的密钥范围约为 1 - 7000。

所以,这个输出似乎只是我实际期望的前几行,我不知道为什么。我错过了整理许多 part-0000# 文件吗?或类似的东西?这只是我在家里测试的一个单节点伪分布式hadoop,所以如果有更多的part-#文件要收集,我不知道它们可能在哪里;它们不会出现在 HDFS 的保留计数目录中。

mapper 和 reducer 代码如下 - 与许多字数示例一样有效;

objectdata_map.py

#!/usr/bin/env python

import sys
RETENTION_DAYS=(8321, 8335)

for line in sys.stdin:
        line=line.strip()
        try:
                retention_days=int(line[RETENTION_DAYS[0]:RETENTION_DAYS[1]])
                print "%s\t%s" % (retention_days,1)
        except:
                continue

objectdata_red.py

#!/usr/bin/env python                                                                                                                                    

import sys                                                                                                                                               
last_key=None
key_count=0
for line in sys.stdin:
        key=line.split('\t')[0]
        if last_key and last_key!=key:
                print "%s\t%s" % (last_key,key_count)
                key_count=1
        else:
                key_count+=1

        last_key=key

print "%s\t%s" % (last_key,key_count)

这一切都在手动安装的 hadoop 1.1.2 上,伪分布式模式,从构建和安装 hadoop-lzo

https://github.com/kevinweil/hadoop-lzo

4

0 回答 0