1

我目前有两个 hadoop 作业,其中第二个作业需要将第一个作业的输出添加到分布式缓存中。目前我手动运行它们,所以在第一个作业完成后,我将输出文件作为参数传递给第二个作业,其驱动程序将其添加到缓存中。

第一份工作只是一个简单的仅限地图的工作,我希望在按顺序执行两个工作时可以运行一个命令。

任何人都可以帮我编写代码以将第一个作业的输出放入分布式缓存中,以便可以将其传递给第二个作业吗?

谢谢

编辑:这是工作 1 的当前驱动程序:

public class PlaceDriver {

public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
        System.err.println("Usage: PlaceMapper <in> <out>");
        System.exit(2);
    }
    Job job = new Job(conf, "Place Mapper");
    job.setJarByClass(PlaceDriver.class);
    job.setMapperClass(PlaceMapper.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    TextInputFormat.addInputPath(job, new Path(otherArgs[0]));
    TextOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

这是 job2 的驱动程序。作业 1 的输出作为第一个参数传递给作业 2 并加载到缓存中

public class LocalityDriver {

public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 3) {
        System.err.println("Usage: LocalityDriver <cache> <in> <out>");
        System.exit(2);
    }
    Job job = new Job(conf, "Job Name Here");
    DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(),job.getConfiguration());
    job.setNumReduceTasks(1); //TODO: Will change
    job.setJarByClass(LocalityDriver.class);
    job.setMapperClass(LocalityMapper.class);
    job.setCombinerClass(TopReducer.class);
    job.setReducerClass(TopReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    TextInputFormat.addInputPath(job, new Path(otherArgs[1]));
    TextOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
4

4 回答 4

1

在同一个主目录中创建两个作业对象。让第一个等待完成,然后再运行另一个。

public class DefaultTest extends Configured implements Tool{


    public int run(String[] args) throws Exception {

        Job job = new Job();

        job.setJobName("DefaultTest-blockx15");

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setNumReduceTasks(15);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setJarByClass(DefaultTest.class);

        job.waitForCompletion(true):

                job2 = new Job(); 

                // define your second job with the input path defined as the output of the previous job.


        return 0;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        ToolRunner.run(new DefaultTest(), otherArgs);
    }
 }
于 2012-04-30T01:42:31.977 回答
0

MapReduce 中的作业链是非常常见的场景。你可以试试cascading,一个开源的 MapReduce 工作流管理软件。这里有一些关于级联的讨论。或者您可以在此处查看与您类似的讨论。

于 2012-04-26T02:11:39.673 回答
0

您还可以使用 ChainMapper、JobControl 和 ControlledJob 来控制您的工作流程

Configuration config = getConf();

Job j1 = new Job(config);
Job j2 = new Job(config);
Job j3 = new Job(config);

j1.waitForCompletion(true);


JobControl jobFlow = new JobControl("j2");
ControlledJob cj3 = new ControlledJob(j2, null);
jobFlow.addJob(cj3);
jobFlow.addJob(new ControlledJob(j2, Lists.newArrayList(cj3)));
jobFlow.addJob(new ControlledJob(j3, null));
jobFlow.run();
于 2015-01-27T18:54:14.403 回答
0

一个直接的答案是将两个主要方法的代码提取到两个单独的方法中,例如:boolean job1()boolean job2() 在一个主要方法中依次调用它们,如下所示:

public static void main(String[] args) throws Exception {
   if (job1()) {
      jobs2();
   }
}

其中job1和调用的返回值是job2调用的结果job.waitForCompletion(true)

于 2012-04-25T08:49:18.993 回答