我目前有两个 hadoop 作业,其中第二个作业需要将第一个作业的输出添加到分布式缓存中。目前我手动运行它们,所以在第一个作业完成后,我将输出文件作为参数传递给第二个作业,其驱动程序将其添加到缓存中。
第一份工作只是一个简单的仅限地图的工作,我希望在按顺序执行两个工作时可以运行一个命令。
任何人都可以帮我编写代码以将第一个作业的输出放入分布式缓存中,以便可以将其传递给第二个作业吗?
谢谢
编辑:这是工作 1 的当前驱动程序:
public class PlaceDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: PlaceMapper <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "Place Mapper");
job.setJarByClass(PlaceDriver.class);
job.setMapperClass(PlaceMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
TextInputFormat.addInputPath(job, new Path(otherArgs[0]));
TextOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
这是 job2 的驱动程序。作业 1 的输出作为第一个参数传递给作业 2 并加载到缓存中
public class LocalityDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 3) {
System.err.println("Usage: LocalityDriver <cache> <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "Job Name Here");
DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(),job.getConfiguration());
job.setNumReduceTasks(1); //TODO: Will change
job.setJarByClass(LocalityDriver.class);
job.setMapperClass(LocalityMapper.class);
job.setCombinerClass(TopReducer.class);
job.setReducerClass(TopReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
TextInputFormat.addInputPath(job, new Path(otherArgs[1]));
TextOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}