我知道这是一个非常基本的问题,但我无法找到我在哪里犯了错误。我的 Reducer 没有从驱动程序代码中调用。如果有人可以帮助我,我将不胜感激。
我的驱动程序代码
package com.mycompany.myorg;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class carsDriver {
public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length != 2){
System.err.println("specified input and output path is not correct");
System.exit(-1);
}
// set up the job details
Job job = new Job(conf,"Cars Avg Fuel Economy");
job.setJarByClass(carsDriver.class);
//job.setJobName("Cars Avg Fuel Economy");
//setup the input and output paths for the MR job
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// setup of the Mapper, combiner and Reducer classes
job.setMapperClass(carsMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//job.setCombinerClass(carsCombiner.class);
job.setReducerClass(carsReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true)?0:1);
}
}
映射器代码
package com.mycompany.myorg;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class carsMapper extends Mapper<Object, Text, Text, IntWritable> {
private Text mapkey = new Text();
private final static IntWritable mapval = new IntWritable(1);
public void map(Object key, Text Value,Mapper<Object, Text, Text, IntWritable>.Context context ) throws IOException, InterruptedException{
System.out.println("Running the Mapper");
String items[] = Value.toString().split(",");
System.out.println(items[2]+" "+Integer.parseInt(items[23].toString()));
mapkey.set(items[2]);
mapval.set(Integer.parseInt(items[23].toString()));
context.write(mapkey, mapval);
}
}
减速机代码
package com.mycompany.myorg;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class carsReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reducer(Text key, Iterable<IntWritable> value,Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
System.out.println("Reducer Code");
Text redKey = new Text();
IntWritable redVal = new IntWritable();
redKey.set(key);
int sum=0;
int count=0;
for(IntWritable val: value){
sum= sum +val.get();
count= count + 1;
}
redVal.set((sum/count));
context.write(redKey, redVal);
}
}