5

我想在 flink 作业完成后执行一些任务,我在 Intellij 中运行代码时没有任何问题,但是当我在 shell 文件中运行 Flink jar 时出现问题。我正在使用下面的行来确保 flink 程序的执行完成

//start the execution

JobExecutionResult jobExecutionResult = envrionment.execute(" Started the execution ");

 is_job_finished = jobExecutionResult.isJobExecutionResult();

我不确定上述检查是否正确?

然后我在下面的方法中使用上面的变量来执行一些任务

    if(print_mode && is_job_finished){



        System.out.println(" \n \n -- System related  variables  -- \n");

        System.out.println(" Stream_join Window length = " + WindowLength_join__ms + " milliseconds");
        System.out.println(" Input rate for stream RR  = " + input_rate_rr_S + " events/second");
        System.out.println("Stream RR Runtime = " + Stream_RR_RunTime_S + " seconds");
        System.out.println(" # raw events in stream RR  = " + Total_Number_Of_Events_in_RR + "\n");

}

有什么建议么 ?

4

3 回答 3

0

如果不是 SQL API,JobListener 就是 grate 程序。

如果使用 SQL API,则永远不会调用 onJobExecuted。我有一个想法,你可以参考一下。source 是 Kafka,sink 可以使用任何类型。 在此处输入图像描述

让我解释一下:

  • EndSign:跟随到最后一个数据。当您的 Flink 作业消耗它时,这意味着分区元素其余部分为空。

关闭逻辑:

  • 当您 flink 作业处理 EndSign 时。job需要调用JobController,然后JobController计数器+1
  • 直到 JobController 计数器等于分区计数。然后 JobController 将检查消费者组延迟,确保 Flink 作业获取所有数据。
  • 现在,我们知道工作已经完成
于 2021-10-03T07:54:20.813 回答
0

将JobListener注册到您的 StreamExecutionEnvironment。

于 2020-11-16T22:02:15.420 回答
0

您可以将作业侦听器注册到执行环境。

例如

env.registerJobListener(new JobListener {
      //Callback on job submission.
      override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): Unit = {
        if (throwable == null) {
          log.info("SUBMIT SUCCESS")
        } else {
          log.info("FAIL")
        }
      }
    //Callback on job execution finished, successfully or unsuccessfully.
      override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = {

        if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }
      }
    })
于 2020-08-27T14:23:24.393 回答