0

我在 flink 1.11.1 中写了一个 flink 批处理作业。工作成功完成后,我想做一些类似调用http服务的事情。

我添加了一个简单的作业侦听器来挂钩作业状态。问题是当 kafka sink 操作员抛出错误时,作业侦听器不会触发。我希望当我的工作失败时,它应该触发我的工作监听器并打印失败日志。

我如何确定工作是否成功完成?

任何帮助将不胜感激。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerJobListener(new JobListener {
      override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): Unit = {
        if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }
      }

      override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = {

        if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }
      }
    })

    env.createInput(input)
      .filter(r => Option(r.token).getOrElse("").nonEmpty)
      .addSink(kafkaProducer)
4

1 回答 1

0

如果您尝试在集群上运行作业,您可以使用您的作业 ID 在控制台中查看您的记录器消息和标准输出。请参考随附的屏幕截图,

如果您在本地集群上运行,默认 url 可能是 http://localhost:8081。

同样,以下不是检查您的工作是否成功的正确方法。

if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }

在此处输入图像描述

于 2020-08-28T10:47:38.540 回答