1

我目前正在使用 Dataproc 的 Java 客户端 API 通过 Spring REST 服务触发 Spark 作业。火花工作的基础是:

  1. 初始化火花
  2. 处理数据
  3. 将结果存储到 GS 存储桶 .json 文件中

我存储数据的原因是,当我的 Spark 作业完成并将结果存储在 JSON 文件中时,我可以从 REST 服务读取存储的结果。但是,Dataproc 的 Java 客户端 API 只是触发作业,并不等待作业完成。因此,等待火花工作完成的最佳方式是什么?我不想使用 Object.wait(int time) 因为不同的火花作业会有不同的执行时间。

4

1 回答 1

3

通过 Dataproc REST API,对作业调用 GET 将返回有关作业状态的信息。一般来说,你可以简单地有一个轮询循环:

public static final ImmutableSet<String> TERMINAL_JOB_STATES =
    ImmutableSet.of("CANCELLED", "DONE", "ERROR");

// Initialize this as normal with credentials, setAppName, HttpTransport, etc.
private Dataproc dataproc;

public void waitJob(String projectId, String jobId) throws IOException, InterruptedException {
  Job job = dataproc.projects().regions().jobs().get(projectId, "global", jobId).execute();
  while (!TERMINAL_JOB_STATES.contains(job.getStatus().getState())) {
    System.out.println("Job not done yet; current state: " + job.getStatus().getState());
    Thread.sleep(5000);
    job = dataproc.projects().regions().jobs().get(projectId, "global", jobId).execute();
  }
  System.out.println("Job terminated in state: " + job.getStatus().getState());
}

您可能还希望将.execute()调用包装在try/catch语句IOException中,以防错误是某种暂时的网络连接错误(任何500 HTTP code错误都应该重试)。您可能还需要最长的等待时间,以防某些事情阻止作业完成,或者您无意中重试404 not found错误。

您还应该能够检测404 not found任何抛出的错误IOException;如果您在轮询完成之前不小心进入并删除了作业,或者如果错误导致您在waitJob呼叫失败的情况下进入呼叫,则会发生这种情况SubmitJob。您应该能够尝试尝试GET不存在的工作并查看错误的外观,以避免在这种情况下出现无限循环。

于 2016-02-29T17:36:35.113 回答