3

当我手动将它上传到我的集群(使用 UI)时,我有一个运行良好的 flink 作业。

但是当我尝试通过 RestClusterClient 部署它时,它给我一个 ClassNotFoundException 失败(我确实看到该作业出现在集群上,并且失败了)。

val packagedProgram = new PackagedProgram(
    new File("/home/laurent/Projects/lead-job-runner/target/lead-0.1-jar-with-dependencies.jar"),
    Array("--kafka-bootstrap-servers", "kafka:29092"): _*   
)
val configuration = new Configuration()  
configuration.setString(JobManagerOptions.ADDRESS, "localhost")  
configuration.setString("jobmanager.rpc.port", "6123")

val clusterClient = new RestClusterClient[StandaloneClusterId](
    configuration,
    StandaloneClusterId.getInstance()
)  
clusterClient.run(packagedProgram, 2)

以下是jobmanager的日志:

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate outputs in order.
    at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:398)
    at  org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1164)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:212)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:190)
    at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.<init>(TwoInputStreamTask.java:55)
    at sun.reflect.GeneratedConstructorAccessor11.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1405)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:689)
    at java.lang.Thread.run(Thread.java:748)
 Caused by: java.lang.ClassNotFoundException: eu.euranova.chng.PlanBuilder$$anon$1
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1868)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at java.util.ArrayList.readObject(ArrayList.java:797)
    at sun.reflect.GeneratedMethodAccessor17.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1170)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2178)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)
    at org.apache.flink.streaming.api.graph.StreamConfig.getOutEdgesInOrder(StreamConfig.java:395)
    ... 10 more

有问题的类确实不是 jar 中的包(它是一个 scala 匿名函数类)。 在此处输入图像描述

但是……为什么与使用完全相同的胖罐子的 RestClusterClient 相比,手动启动作业时运行良好?

另外,在将 flink 日志级别设置为 DEBUG 时,我在 IntelliJ 中看到很多日志,如下所示:

DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming TwoInputTransformation{id=17, name='process-control-operator', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.model.ShareEvent>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraph              - CO-TASK: 17
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming SideOutputTransformation{id=21, name='SideOutput', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.flink.streaming.control.event.ControlEvent>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming TwoInputTransformation{id=20, name='process-control-operator', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.model.Row>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming TwoInputTransformation{id=19, name='Co-Map', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.model.Row>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming PartitionTransformation{id=12, name='Partition', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), eu.euranova.model.InstallEvent(userId: String, gameId: String, timestamp: Long, name: String, dumVar: Long)>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraph              - CO-TASK: 19
DEBUG org.apache.flink.streaming.api.graph.StreamGraph              - CO-TASK: 20
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming SideOutputTransformation{id=24, name='SideOutput', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.flink.streaming.control.event.ControlEvent>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraphGenerator     - Transforming TwoInputTransformation{id=23, name='process-control-operator', outputType=Java Tuple2<scala.Tuple2(_1: String, _2: String), GenericType<eu.euranova.model.Row>>, parallelism=2}
DEBUG org.apache.flink.streaming.api.graph.StreamGraph              - CO-TASK: 23

...这似乎表明至少“驱动程序”在本地运行,而任务被发送到集群(另一种解释是集群将所有日志发送回 RestClusterClient,我发现它没有链接)

你知道是什么原因造成的吗?我怎样才能让它发挥作用?

在此先感谢您的帮助。

此致,

劳伦特。

4

0 回答 0