我注意到您使用的是 0.18 版本的文档。这是 1.0.2(最新)的链接。
第一个建议 - 使用 IDE(eclipse、IDEA 等)。这真的有助于填补空白。
在实际的 HDFS 中,您无法知道每个文件的位置(不同的机器和集群)。无法保证 X 行甚至与 Y 行驻留在同一个磁盘上。也不能保证 X 行不会被拆分到不同的机器上(HDFS 以块的形式分布数据,通常每个 64Mb)。这意味着您不能假设同一个映射器将处理整个文件。您可以确保每个文件都由同一个 reducer 处理。
由于从映射器发送的每个键的减速器都是唯一的,所以我这样做的方法是使用文件名作为映射器中的输出键。此外,映射器的默认输入类是TextInputFormat
,这意味着每个映射器将自己接收一整行(由 LF 或 CR 终止)。然后,您可以从映射器发出文件名和数字 1(或其他与计算无关的)。然后,在 reducer 中,您只需使用循环来计算文件名被接收的次数:
在映射器的 map 函数中
public static class Map extends Mapper<IntWritable, Text, Text, Text> {
public void map(IntWritable key, Text value, Context context) {
// get the filename
InputSplit split = context.getInputSplit();
String fileName = split.getPath().getName();
// send the filename to the reducer, the value
// has no meaning (I just put "1" to have something)
context.write( new Text(fileName), new Text("1") );
}
}
在 reducer 的 reduce 函数中
public static class Reduce extends Reducer<Text, Text, Text, Text> {
public void reduce(Text fileName, Iterator<Text> values, Context context) {
long rowcount = 0;
// values get one entry for each row, so the actual value doesn't matter
// (you can also get the size, I'm just lazy here)
for (Text val : values) {
rowCount += 1;
}
// fileName is the Text key received (no need to create a new object)
context.write( fileName, new Text( String.valueOf( rowCount ) ) );
}
}
在驱动程序/主要
您几乎可以使用与 wordcount 示例相同的驱动程序 - 请注意,我使用了新的 mapreduce API,因此您需要调整一些东西(Job
而不是JobConf
等)。当我阅读它时,这真的很有帮助。
请注意,您的 MR 输出将只是每个文件名及其行数:
input1.txt 3
input2.txt 4
input3.txt 9
如果您只想计算所有文件中的总行数,只需在所有映射器中发出相同的键(而不是文件名)。这样就只有一个 reducer 来处理所有的行计数:
// no need for filename
context.write( new Text("blah"), new Text("1") );
您还可以将处理每个文件行数的输出的作业链接起来,或者做其他花哨的事情——这取决于您。
我留下了一些样板代码,但基础知识就在那里。一定要检查我,因为我是从记忆中输入的大部分内容.. :)
希望这可以帮助!