0

I am trying to chain two sequential M/R jobs in hadoop with the following code. Basically, right after the first job completes, I make another job that uses the first job's output as input. But the code does not produce the output for the second job, and it didn't throw any exception. Can you guys help me spot where might be wrong? I appreciate it.

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args)
            .getRemainingArgs();
    if (otherArgs.length != 3) {
        System.err.println("Usage: jobStats <in> <out> <job>");
        System.exit(2);
    }


    conf.set("job", otherArgs[2]);
    Job job = new Job(conf, "job count");
    job.setJarByClass(jobStats.class);
    job.setMapperClass(jobMapper.class);
    job.setCombinerClass(jobReducer.class);
    job.setReducerClass(jobReducer.class);

    job.setMapOutputKeyClass(Text.class);        
    job.setMapOutputValueClass(IntWritable.class);           
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    boolean completionStatus1 = job.waitForCompletion(true);
    if (completionStatus1 == true)
    {
        Job job2 = new Job(conf, "job year ranking");
        job2.setJarByClass(jobStats.class);
        job2.setPartitionerClass(ChainedPartitioner.class);
        job2.setGroupingComparatorClass(CompKeyGroupingComparator.class);
        job2.setSortComparatorClass(CompKeyComparator.class);

        job2.setMapperClass(ChainedMapper.class);
        job2.setReducerClass(ChainedReducer.class);
        job2.setPartitionerClass(ChainedPartitioner.class);
        job2.setMapOutputKeyClass(CompositeKey.class);
        job2.setMapOutputValueClass(IntWritable.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(IntWritable.class);

        Path outPath = new Path(otherArgs[1] + "part-r-00000"); // this is the hard-coded output of first job
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(outPath))
        {
            FileInputFormat.addInputPath(job2, outPath);
            FileOutputFormat.setOutputPath(job2, new Path("/user/tony/output/today"));

            boolean completionStatus2 = job2.waitForCompletion(true);
            if (completionStatus2 == true)
            {
                fs.delete(outPath, true);
                System.exit(0);
            }
            else System.exit(1);
        }
        else System.exit(1);
    }
}
4

1 回答 1

0

ChainedMapper 和 ChainedReducer 类用于在单个 Map Reduce 作业中将多个映射器串在一起。类似于 M1-M2-M3-R-M4-M5。

在您的情况下,您希望连续运行两个完整的 map reduce 作业。只需为第二份工作指定一张真实的地图。

于 2013-10-29T15:08:46.217 回答