使用新的API覆盖run()
mapper 和 reducer 。org.apache.hadoop.mapreduce
在这些方法中,您可以从每个映射器或化简器发出累积的总和/计数。
此外,您还需要将 reducer 计数限制为 1,以便获得多个映射器生成的所有总和的全局总和。
请参阅下面的代码以获得更清晰的信息:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class AggregationExample extends Configured implements Tool {
/**
* This is Mapper.
*
*/
public static class MapJob extends Mapper<LongWritable, Text, Text, Text> {
private Text outputKey = new Text();
private Text outputValue = new Text();
private double sum;
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
try {
// say that you need to sum up the value part
sum+= Double.valueOf(value);
}
@Override
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
// emit out the sum per mapper
outputKey.set(sum);
context.write(outputKey, outputValue);// Notice that the outputValue is empty
cleanup(context);
}
}
/**
* This is Reducer.
*
*/
public static class ReduceJob extends Reducer<Text, Text, Text, Text> {
private Text outputKey = new Text();
private Text outputValue = new Text();
private double sum;
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException,
InterruptedException {
// summation of values from each mapper
sum += Double.valueOf(key.toString());
}
@Override
public void run(Context context) throws IOException, InterruptedException {
setup(context);
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
}
// emit out the global sums
outputKey.set(sum);
context.write(outputKey, outputValue);
cleanup(context);
}
}
@Override
public int run(String[] args) throws Exception {
try {
Configuration conf = getConf();
// output key and value separator is empty as in final output only
// key is emitted and value is empty
conf.set("mapred.textoutputformat.separator", "");
// Configuring mapred to have just one reducer as we need to find
// single sum values from all the inputs
conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
conf.setInt("mapred.reduce.tasks", 1);
Job job = new Job(conf);
job.setJarByClass(AggregationExample.class);
job.setJobName("Aggregation Example");
job.setMapperClass(MapJob.class);
job.setReducerClass(ReduceJob.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, args[0]);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean success = job.waitForCompletion(true);
return success ? 0 : 1;
} catch (Exception e) {
e.printStackTrace();
return 1;
}
}
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.out
.println("Usage: AggregationExample <comma sparated list of input directories> <output dir>");
System.exit(-1);
}
int result = ToolRunner.run(new AggregationExample(), args);
System.exit(result);
}
}
您可以很好地将这种方法映射到您的问题。