4

Ruby 中一个简单的 wordcount reducer 如下所示:

#!/usr/bin/env ruby
wordcount = Hash.new
STDIN.each_line do |line|
keyval = line.split("|")
wordcount[keyval[0]] = wordcount[keyval[0]].to_i+keyval[1].to_i
end

wordcount.each_pair do |word,count|
puts "#{word}|#{count}"
end

它在 STDIN 中获取所有映射器的中间值。不是来自特定的键。所以实际上所有人只有一个减速器(而不是每个单词或每组单词的减速器)。

但是,在 Java 示例中,我看到这个接口获取一个键和值列表作为 inout。这意味着中间映射值在归约之前按键分组,并且归约器可以并行运行:

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
            public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
              int sum = 0;
              while (values.hasNext()) {
                sum += values.next().get();
              }
              output.collect(key, new IntWritable(sum));
            }
          }

这是 Java 独有的功能吗?或者我可以使用 Ruby 使用 Hadoop Streaming 来做到这一点吗?

4

2 回答 2

5

无论您是否使用流式处理,Reducers 将始终并行运行(如果您没有看到这一点,请验证作业配置是否设置为允许多个 reduce 任务 - 请参阅集群或作业配置中的 mapred.reduce.tasks )。不同之处在于,当您使用 Java 与流式传输时,框架会更好地为您打包。

对于 Java,reduce 任务获取一个针对特定键的所有值的迭代器。如果您在 reduce 任务中对 map 输出求和,这使得遍历值变得容易。在流式传输中,您实际上只是得到一个键值对流。您可以保证这些值将按键排序,并且对于给定键的值不会在 reduce 任务中拆分,但是您需要的任何状态跟踪都取决于您。例如,在 Java 中,您的 map 输出以符号形式出现在您的 reducer 中

key1, {val1, val2, val3} key2, {val7, val8}

使用流式传输,您的输出看起来像

key1, val1 key1, val2 key1, val3 key2, val7 key2, val8

例如,要编写一个计算每个键的值总和的化简器,您需要一个变量来存储您看到的最后一个键,并需要一个变量来存储总和。每次读取新的键值对时,都执行以下操作:

  1. 检查密钥是否与最后一个密钥不同。
  2. 如果是这样,输出您的密钥和当前总和,并将总和重置为零。
  3. 将当前值添加到总和并将最后一个键设置为当前键。

HTH。

于 2009-05-09T19:13:10.013 回答
1

我自己没有尝试过 Hadoop Streaming,但通过阅读文档,我认为您可以实现类似的并行行为。

流式传输将通过键对映射器输出进行分组,而不是将具有关联值的键传递给每个化简器。它还保证具有相同键的值不会被拆分到多个 reducer。这与正常的 Hadoop 功能有些不同,但即便如此,reduce 工作仍将分布在多个 reducer 上。

尝试使用该-verbose选项来获取有关实际情况的更多信息。您还可以尝试使用-D mapred.reduce.tasks=XX 是所需的减速器数量的选项。

于 2009-05-08T13:22:49.350 回答