正如“SSaikia_JtheRocker”所解释的,映射器任务是根据 HDFS 块上的逻辑拆分总数创建的。我想在问题 #3 中添加一些内容“一个输入文件如何在映射器之间分布?一个映射器的输出分布在多个减速器之间(这是由框架完成的,或者您可以更改)?” 例如,考虑我的字数统计程序,它计算文件中的字数,如下所示:
#
公共类 WCMapper 扩展 Mapper {
@Override
public void map(LongWritable key, Text value, Context context) // Context context is output
throws IOException, InterruptedException {
// value = "How Are You"
String line = value.toString(); // This is converting the Hadoop's "How Are you" to Java compatible "How Are You"
StringTokenizer tokenizer = new StringTokenizer (line); // StringTokenizer returns an array tokenizer = {"How", "Are", "You"}
while (tokenizer.hasMoreTokens()) // hasMoreTokens is a method in Java which returns boolean values 'True' or 'false'
{
value.set(tokenizer.nextToken()); // value's values are overwritten with "How"
context.write(value, new IntWritable(1)); // writing the current context to local disk
// How, 1
// Are, 1
// You, 1
// Mapper will run as many times as the number of lines
}
}
}
#
所以在上面的程序中,对于“你好吗”这一行被 StringTokenizer 分成 3 个单词,当在 while 循环中使用它时,映射器被调用的次数与单词的数量一样多,所以这里调用了 3 个映射器。
而reducer,我们可以使用'job.setNumReduceTasks(5);'来指定我们希望我们的输出生成多少个reducer 陈述。下面的代码片段会给你一个想法。
#
公共类 BooksMain {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// Use programArgs array to retrieve program arguments.
String[] programArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
Job job = new Job(conf);
job.setJarByClass(BooksMain.class);
job.setMapperClass(BookMapper.class);
job.setReducerClass(BookReducer.class);
job.setNumReduceTasks(5);
// job.setCombinerClass(BookReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// TODO: Update the input path for the location of the inputs of the map-reduce job.
FileInputFormat.addInputPath(job, new Path(programArgs[0]));
// TODO: Update the output path for the output directory of the map-reduce job.
FileOutputFormat.setOutputPath(job, new Path(programArgs[1]));
// Submit the job and wait for it to finish.
job.waitForCompletion(true);
// Submit and return immediately:
// job.submit();
}
}
#