我在 flink 1.11.1 中写了一个 flink 批处理作业。工作成功完成后,我想做一些类似调用http服务的事情。
我添加了一个简单的作业侦听器来挂钩作业状态。问题是当 kafka sink 操作员抛出错误时,作业侦听器不会触发。我希望当我的工作失败时,它应该触发我的工作监听器并打印失败日志。
我如何确定工作是否成功完成?
任何帮助将不胜感激。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerJobListener(new JobListener {
override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): Unit = {
if (throwable == null) {
log.info("SUCCESS")
} else {
log.info("FAIL")
}
}
override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = {
if (throwable == null) {
log.info("SUCCESS")
} else {
log.info("FAIL")
}
}
})
env.createInput(input)
.filter(r => Option(r.token).getOrElse("").nonEmpty)
.addSink(kafkaProducer)