0

我一直在运行一个基于 Apache Beam 的数据摄取作业,它解析输入 CSV 文件并将数据写入接收器。当一次提交一个作业时(在正常负载情况下),此作业可以正常工作。

但最近当我开始负载测试时,我开始在一个循环中按顺序安排多个作业,我观察到一些异常和作业失败。目前,我正在使用脚本通过 Flink REST API 为RUNNING. 安排这些作业时,有时所有作业都在执行而没有任何问题,但大多数情况下,9 个作业中有 2 或 3 个失败,但有以下例外。我已经尝试过使用多个输入 CSV 文件的工作,但它显示出类似的行为。

例外一:

2020-06-02 11:22:45,686 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The program plan could not be fetched - the program aborted pre-maturely.

System.err: (none)

System.out: (none)
        at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:108)
        at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
        at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
        at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

例外 2:

2020-06-02 11:22:46,035 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The program caused an error: 
        at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
        at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
        at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:128)
        at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
        at org.apache.flink.client.program.OptimizerPlanEnvironment.execute(OptimizerPlanEnvironment.java:54)
        at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:139)
        at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:315)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:301)
        at com.spotify.scio.ScioContext.execute(ScioContext.scala:598)
        at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:586)
        at com.spotify.scio.ScioContext$$anonfun$run$1.apply(ScioContext.scala:574)
        at com.spotify.scio.ScioContext.requireNotClosed(ScioContext.scala:694)
        at com.spotify.scio.ScioContext.run(ScioContext.scala:574)
        at com.sparkcognition.foundation.ingest.jobs.delimitedingest.DelimitedIngest$.main(DelimitedIngest.scala:70)
        at com.sparkcognition.foundation.ingest.jobs.delimitedingest.DelimitedIngest.main(DelimitedIngest.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:557)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:449)
        at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
        ... 7 more

作业 jar 文件是使用 Spotify 的 Scio 库 (0.8.0) 在 Scala 中开发的。Flink 集群有以下规格:

  • Flink 版本 1.8.3
  • 1 个主节点和 2 个工作节点
  • 32 个任务槽和 2 个任务执行器
  • 作业管理器堆大小 = 48Gb
  • 任务执行器堆大小 = 24Gb
4

1 回答 1

0

如果您的代码 (env.execute()) 包含在 Exception() 块中,则从 REST 或 UI 提交会失败。我看到 stackTrace 是相似的,但有时成功,有时失败,行为是不同的。使用我的代码,REST 提交过去常常失败。检查下面的 strackOverFlow 参考,看看它是否有帮助。 从 Flink Job UI 提交作业的问题(异常:org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException)

于 2020-06-19T07:10:02.953 回答