1

根据在雅虎的hadoop教程上找到的附图,操作的顺序是map > combine > partition,应该遵循reduce

这是我的 map 操作发出的示例键

LongValueSum:geo_US|1311722400|E        1

假设有 100 个相同类型的键,这应该组合为

geo_US|1311722400|E     100

然后我想按第一个管道(|) http://hadoop.apache.org/common/docs/r0.20.2/streaming.html#A+Useful+Partitioner+Class+%28secondary之前的值对键进行分区+sort%2C+the+-partitioner+org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner+option%29

geo_US

所以这是我的流媒体命令

hadoop jar /usr/local/hadoop/contrib/streaming/hadoop-streaming-0.20.203.0.jar \
-D mapred.reduce.tasks=8 \
-D stream.num.map.output.key.fields=1 \
-D mapred.text.key.partitioner.options=-k1,1 \
-D stream.map.output.field.separator=\| \
-file mapper.py \
-mapper mapper.py \
-file reducer.py \
-reducer reducer.py \
-combiner org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-input input_file \
-output output_path

这是我得到的错误

java.lang.NumberFormatException: For input string: "1311722400|E    1"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:48)
at java.lang.Long.parseLong(Long.java:419)
at java.lang.Long.parseLong(Long.java:468)
at org.apache.hadoop.mapred.lib.aggregate.LongValueSum.addNextValue(LongValueSum.java:48)
at org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer.reduce(ValueAggregatorReducer.java:59)
at org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer.reduce(ValueAggregatorReducer.java:35)
at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1349)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1435)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1297)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:371)
at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
at org.apache.hadoop.mapred.Child.main(Child.java:253)

我看起来partitionercombiner之前运行。有什么想法吗?

4

2 回答 2

1

无法保证对于大于 0.16 的 hadoop 版本实际运行组合器。在 hadoop 17 中,如果单个<K,V>占用整个排序缓冲区,则不会运行组合器。在版本 > 0.18 中,组合器可以在 map 和 reduce 阶段运行多次。

基本上你的算法不应该依赖于是否调用了 Combine 函数,因为它只是一种优化。欲了解更多信息,请查看Haddop 一书,权威指南.. 在这里找到了关于 Google 图书上的组合功能的片段

于 2011-08-07T03:29:06.257 回答
1

我已经检查了“Hadoop权威指南”第6章随机和排序。映射输出首先在内存中缓冲。当内存超过其阈值时,映射输出将被写入磁盘。在写入磁盘之前,数据将被分区。在每个分区内,数据将按键排序。之后如果有组合器功能,则组合排序输出。

磁盘上可能有很多溢出文件,如果至少有 3 个溢出文件,组合器将在输出写入磁盘之前再次运行。

最后,所有溢出文件将被合并到一个文件中,以减少 IO 数量。

简而言之,对于 mapper:map --> partition --> sort ---> combiner

对于reduer:复制表单映射器->合并(如果存在则调用组合器)->减少

于 2014-03-20T03:38:26.500 回答