通过 Dataproc REST API,对作业调用 GET 将返回有关作业状态的信息。一般来说,你可以简单地有一个轮询循环:
public static final ImmutableSet<String> TERMINAL_JOB_STATES =
ImmutableSet.of("CANCELLED", "DONE", "ERROR");
// Initialize this as normal with credentials, setAppName, HttpTransport, etc.
private Dataproc dataproc;
public void waitJob(String projectId, String jobId) throws IOException, InterruptedException {
Job job = dataproc.projects().regions().jobs().get(projectId, "global", jobId).execute();
while (!TERMINAL_JOB_STATES.contains(job.getStatus().getState())) {
System.out.println("Job not done yet; current state: " + job.getStatus().getState());
Thread.sleep(5000);
job = dataproc.projects().regions().jobs().get(projectId, "global", jobId).execute();
}
System.out.println("Job terminated in state: " + job.getStatus().getState());
}
您可能还希望将.execute()
调用包装在try/catch
语句IOException
中,以防错误是某种暂时的网络连接错误(任何500 HTTP code
错误都应该重试)。您可能还需要最长的等待时间,以防某些事情阻止作业完成,或者您无意中重试404 not found
错误。
您还应该能够检测404 not found
任何抛出的错误IOException
;如果您在轮询完成之前不小心进入并删除了作业,或者如果错误导致您在waitJob
呼叫失败的情况下进入呼叫,则会发生这种情况SubmitJob
。您应该能够尝试尝试GET
不存在的工作并查看错误的外观,以避免在这种情况下出现无限循环。