2

我试图弄清楚如何链接多个 hadoop 作业,将一个步骤的输出馈送到下一步的输入。我通过谷歌搜索发现的很多东西说我应该从单个线程一次调用它们并等待完成,或者我应该使用 Job.addDependingJob() 然后提交它们。我选择了后者,但在前一个工作完成后,我似乎无法执行后续工作。

这是我的代码:

List<Job> jobs = new ArrayList<Job>();

for(int i = 0; i < stepCount; i++) {
    JobConf jc = new JobConf(clusterConfig);

    ... set up mappers and reducers here ...

    ... set up input and output paths here ...

    Job j = new Job(jc);
    j.addDependingJob(jobs.get(jobs.size() - 1);
    jobs.add(j);
}

for(Job j : Jobs) {
    JobClient client = new JobClient();
    client.init(j.getJobConf());
    client.submit(j.getJobConf());
}

所有作业同时运行,我得到如下输出:

  • 没有设置作业 jar 文件。可能找不到用户类。请参阅 JobConf(Class) 或 JobConf#setJar(String)。
  • 处理的总输入路径:1
  • 使用 GenericOptionsParser 解析参数。应用程序应该实现同样的工具。
  • 没有设置作业 jar 文件。可能找不到用户类。请参阅 JobConf(Class) 或 JobConf#setJar(String)。
  • 处理的总输入路径:0
  • 使用 GenericOptionsParser 解析参数。应用程序应该实现同样的工具。
  • 没有设置作业 jar 文件。可能找不到用户类。请参阅 JobConf(Class) 或 JobConf#setJar(String)。
  • 处理的总输入路径:0
  • 使用 GenericOptionsParser 解析参数。应用程序应该实现同样的工具。
  • 没有设置作业 jar 文件。可能找不到用户类。请参阅 JobConf(Class) 或 JobConf#setJar(String)。
  • 处理的总输入路径:0

我究竟做错了什么?

注意:我使用的是 Hadoop 0.20.205

编辑澄清:我需要能够向集群提交作业链,然后立即返回,而无需等待作业链完成。

4

3 回答 3

2

JobControl 应该用于设置作业之间的依赖关系。给定代码中未设置依赖项,因此作业并行运行,而不是按顺序运行。如果有更复杂的工作流程,则可以使用 Oozie。

是一篇有趣的文章。

于 2013-03-21T16:20:39.690 回答
1

自从我处理这个问题以来已经有几年了,但我看到了一些事情:

  1. 您的错误与作业之间的链接没有任何关系。在担心链接它们之前,请确保您可以运行单个作业。
  2. Jobcontrol 不会(或至少在 2010 年没有)将作业序列提交给作业跟踪器,它只是一个用于检查上游作业何时完成并自动将下一个作业提交给作业跟踪器的工具。您将无法仅运行它并退出。
  3. 您不应该在作业上调用提交。这将提交他们运行。您应该将控制权转移到那里的某个地方的工作控制权。

我认为这很令人困惑,并开始在https://github.com/kevinpet/jobcontrol中编写我自己的 DAG 助手,您可能会或可能不会觉得有用。

于 2013-03-22T06:49:57.127 回答
-1

以下是链接 map reduce 作业的方式。在这里,我们在第一个作业的输出上运行第二个作业:

        Job job1 = new Job(conf, "job1");
    Job job2 = new Job(conf,"job2");
    job1.setJarByClass(driver.class);
    job1.setMapperClass(Map.class);
    job1.setCombinerClass(Reduce.class);
    job1.setReducerClass(Reduce.class);
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(IntWritable.class);
    String outputpath="/user/hadoop/firstjoboutput";
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(outputpath));
    job1.waitForCompletion(true);

    job2.setJarByClass(driver.class);
    job2.setMapperClass(SecondMap.class);
    job2.setReducerClass(SecondReducer.class);
    job2.setMapOutputKeyClass(IntWritable.class);
    job2.setMapOutputValueClass(Text.class);
    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job2, new Path(outputpath));
    String finaloutput="/user/hadoop/finaloutput";
    FileOutputFormat.setOutputPath(job2, new Path(finaloutput));


    System.exit(job2.waitForCompletion(true) ? 0 : 1);
于 2013-03-22T06:04:40.687 回答