psabbate 的回答让我找到了一些我遗漏的 API。这就是我解决它的方法:
在驱动程序类中,使用如下代码启动作业:
List<RunningJob> runningJobs = new ArrayList<RunningJob>();
for (String jobSpec: jobSpecs) {
// Configure, for example, a params map that gets passed into the MR class's constructor
ToolRunner.run(new Configuration(), new MapReduceClass(params, runningJobs), null);
}
for (RunningJob rj: runningJobs) {
System.err.println("Waiting on job "+rj.getID());
rj.waitForCompletion();
}
然后,在 MapReduceClass 中,定义一个私有变量List<RunningJob> runningJobs
,定义一个构造函数,如下所示:
public MergeAndScore(Map<String, String> p, List<RunningJob> rj) throws IOException {
params = Collections.unmodifiableMap(p);
runningJobs = rj;
}
在调用的run()
方法中,定义你的并提交作业ToolRunner
JobConf
JobClient jc = new JobClient();
jc.init(conf);
jc.setConf(conf);
runningJobs.add(jc.submitJob(conf));
这样,run()
立即返回,并且可以通过runningJobs
驱动程序类中的对象访问作业。
请注意,我正在使用旧版本的 Hadoop,因此jc.init(conf)
和/或jc.setConf(conf)
可能需要也可能不需要,具体取决于您的设置,尽管可能至少需要其中一个。