1

我正在尝试执行 MapReduce 作业,并且我想定期轮询其状态。我正在尝试在代码中使用 Jobclient 类的 submitJob(jobConf) 方法,如下所示

 JobConf conf = new JobConf(SampleJobClass.class)   
 JobClient j= new JobClient();
 j.setConf(conf);
 System.out.println("from conf" +j.getConf().toString());
 RunningJob submitJob=j.submitJob(conf);`

我不断收到空指针错误j.submitJob(conf) 错误堆栈是:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:844)
    at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:818)

关于我做错了什么的任何想法?

4

2 回答 2

0

根据提供的信息,我假设您使用 hadoop-2.0.0-mr1-cdh4.0.0 / 4.0.1。
在第 844 行:

return ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {

NPE 可能发生,因为 ugi 为空。( UserGroupInformation ) 这是在 init()方法中设置的,但在默认构造函数中没有调用。因此,手动调用它:

j.init(conf); 

或简单地使用:

j = new JobClient(conf);

它在内部调用init()

另一方面,我宁愿在命令行中运行 Hadoop 作业,然后执行一些轮询作业信息的客户端代码,例如:

InetSocketAddress jobtrackerAddr = new InetSocketAddress("myhost",8021);
Configuration conf = new Configuration();
JobClient jobClient = new JobClient(jobtrackerAddr, conf);
JobStatus[] runningJobs = jobClient.jobsToComplete();
...

(要检索多个集群信息,请参阅我之前的回答

于 2012-12-08T00:58:05.400 回答
0

我也有这个问题。但是使用j.runJob(conf);效果很好。你也可以使用静态函数JobClient.runJob(conf); 我不知道为什么,但我去阅读JobClienta的源代码并找到静态函数runJob的实现。

public static RunningJob runJob(JobConf job) throws IOException {
JobClient jc = new JobClient(job);
RunningJob rj = jc.submitJob(job);
try {
  if (!jc.monitorAndPrintJob(job, rj)) {
    throw new IOException("Job failed!");
  }
} catch (InterruptedException ie) {
  Thread.currentThread().interrupt();
}
return rj;

所以我以这种方式使用 submitJob 功能:

JobClient jcli = new JobClient(jconf);
        RunningJob rJob = jcli.submitJob(jconf);
        while (true){
            Thread.sleep(5000);
            System.out.println(rJob);
            if (rJob.isComplete())
                break;
        }

我工作!也许这对你也有用。

于 2013-06-22T04:24:33.360 回答