5

我是 Hadoop 新手,我已经设法运行 wordCount 示例:http ://hadoop.apache.org/common/docs/r0.18.2/mapred_tutorial.html

假设我们有一个包含 3 个文件的文件夹。我希望每个文件都有一个映射器,这个映射器只会计算行数并将其返回给减速器。

然后,reducer 将每个映射器的行数作为输入,并将所有 3 个文件中存在的总行数作为输出。

所以如果我们有以下3个文件

input1.txt
input2.txt
input3.txt

映射器返回:

mapper1 -> [input1.txt, 3]
mapper2 -> [input2.txt, 4]
mapper3 -> [input3.txt, 9]

减速器将输出

3+4+9 = 16 

我已经在一个简单的 java 应用程序中完成了这个,所以我想在 Hadoop 中完成它。我只有一台计算机,想尝试在伪分布式环境上运行。

我怎样才能做到这一点?我应该采取哪些正确的步骤?

我的代码应该在 apache 的示例中看起来像这样吗?我将有两个静态类,一个用于映射器,一个用于减速器?还是我应该有 3 个类,每个映射器一个?

如果你能指导我完成这个,我不知道该怎么做,我相信如果我设法编写一些代码来做这些事情,那么我将来能够编写更复杂的应用程序。

谢谢!

4

2 回答 2

11

除了 sa125 的答案,您可以通过不为每个输入记录发出记录来极大地提高性能,而只是在映射器中累积一个计数器,然后在映射器清理方法中发出文件名和计数值:

public class LineMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    protected long lines = 0;

    @Override
    protected void cleanup(Context context) throws IOException,
            InterruptedException {
        FileSplit split = (FileSplit) context.getInputSplit();
        String filename = split.getPath().toString();

        context.write(new Text(filename), new LongWritable(lines));
    }

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        lines++;
    }
}
于 2012-04-29T19:06:40.113 回答
5

我注意到您使用的是 0.18 版本的文档。这是 1.0.2(最新)的链接。

第一个建议 - 使用 IDE(eclipse、IDEA 等)。这真的有助于填补空白。

在实际的 HDFS 中,您无法知道每个文件的位置(不同的机器和集群)。无法保证 X 行甚至与 Y 行驻留在同一个磁盘上。也不能保证 X 行不会被拆分到不同的机器上(HDFS 以块的形式分布数据,通常每个 64Mb)。这意味着您不能假设同一个映射器将处理整个文件。您可以确保每个文件都由同一个 reducer 处理

由于从映射器发送的每个键的减速器都是唯一的,所以我这样做的方法是使用文件名作为映射器中的输出键。此外,映射器的默认输入类是TextInputFormat,这意味着每个映射器将自己接收一整行(由 LF 或 CR 终止)。然后,您可以从映射器发出文件名和数字 1(或其他与计算无关的)。然后,在 reducer 中,您只需使用循环来计算文件名被接收的次数:

在映射器的 map 函数中

public static class Map extends Mapper<IntWritable, Text, Text, Text> {

  public void map(IntWritable key, Text value, Context context) {
    // get the filename
    InputSplit split = context.getInputSplit();
    String fileName = split.getPath().getName();

    // send the filename to the reducer, the value
    // has no meaning (I just put "1" to have something)
    context.write( new Text(fileName), new Text("1") );
  }

}

在 reducer 的 reduce 函数中

public static class Reduce extends Reducer<Text, Text, Text, Text> {

  public void reduce(Text fileName, Iterator<Text> values, Context context) {
    long rowcount = 0;

    // values get one entry for each row, so the actual value doesn't matter
    // (you can also get the size, I'm just lazy here)
    for (Text val : values) {
      rowCount += 1;
    }

    // fileName is the Text key received (no need to create a new object)
    context.write( fileName, new Text( String.valueOf( rowCount ) ) );
  }

}

在驱动程序/主要

您几乎可以使用与 wordcount 示例相同的驱动程序 - 请注意,我使用了新的 mapreduce API,因此您需要调整一些东西(Job而不是JobConf等)。当我阅读它时,这真的很有帮助。

请注意,您的 MR 输出将只是每个文件名及其行数:

input1.txt    3
input2.txt    4
input3.txt    9

如果您只想计算所有文件中的总行数,只需在所有映射器中发出相同的键(而不是文件名)。这样就只有一个 reducer 来处理所有的行计数:

// no need for filename
context.write( new Text("blah"), new Text("1") );

您还可以将处理每个文件行数的输出的作业链接起来,或者做其他花哨的事情——这取决于您。

我留下了一些样板代码,但基础知识就在那里。一定要检查我,因为我是从记忆中输入的大部分内容.. :)

希望这可以帮助!

于 2012-04-28T21:34:43.117 回答