I am trying to chain two sequential M/R jobs in hadoop with the following code. Basically, right after the first job completes, I make another job that uses the first job's output as input. But the code does not produce the output for the second job, and it didn't throw any exception. Can you guys help me spot where might be wrong? I appreciate it.
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: jobStats <in> <out> <job>");
System.exit(2);
}
conf.set("job", otherArgs[2]);
Job job = new Job(conf, "job count");
job.setJarByClass(jobStats.class);
job.setMapperClass(jobMapper.class);
job.setCombinerClass(jobReducer.class);
job.setReducerClass(jobReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
boolean completionStatus1 = job.waitForCompletion(true);
if (completionStatus1 == true)
{
Job job2 = new Job(conf, "job year ranking");
job2.setJarByClass(jobStats.class);
job2.setPartitionerClass(ChainedPartitioner.class);
job2.setGroupingComparatorClass(CompKeyGroupingComparator.class);
job2.setSortComparatorClass(CompKeyComparator.class);
job2.setMapperClass(ChainedMapper.class);
job2.setReducerClass(ChainedReducer.class);
job2.setPartitionerClass(ChainedPartitioner.class);
job2.setMapOutputKeyClass(CompositeKey.class);
job2.setMapOutputValueClass(IntWritable.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(IntWritable.class);
Path outPath = new Path(otherArgs[1] + "part-r-00000"); // this is the hard-coded output of first job
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outPath))
{
FileInputFormat.addInputPath(job2, outPath);
FileOutputFormat.setOutputPath(job2, new Path("/user/tony/output/today"));
boolean completionStatus2 = job2.waitForCompletion(true);
if (completionStatus2 == true)
{
fs.delete(outPath, true);
System.exit(0);
}
else System.exit(1);
}
else System.exit(1);
}
}