我在纱线上部署了我的 pyflink 作业,它包括 kafka 消耗。
我的运行命令:
/opt/flink/bin/flink run -m yarn-cluster -yid application_1634021687380_0009 --jarfile=/opt/flink/lib/flink-sql-connector-kafka_2.11-1.13.2.jar -pyarch venv.zip -pyexec venv.zip/venv/bin/python -py demo.py
当我提交我的工作时,它得到这个输出错误:</p>
<pre>SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/ impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/hdp/3.1.0.0-78/hadoop/lib/slf4j-log4j12-1.7.25.ja r!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
2021-10-13 14:41:14,387 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-flink.
2021-10-13 14:41:14,387 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-flink.
PID 32298: initializing main ...
Traceback (most recent call last):
File "/opt/demo.py", line 92, in <module>
GenUniqueIPAssest()
File "/opt/demo.py", line 84, in GenUniqueIPAssest
source_table.select(
File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table.py", line 1056, in execute _insert
File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1285, in __call__
File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco
File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 326, in get_ return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o64.executeInsert.
: java.lang.NoClassDefFoundError: org/apache/kafka/common/errors/InvalidTxnStateExceptio n
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createKafk aProducer(KafkaDynamicSink.java:329)
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.getSinkRun timeProvider(KafkaDynamicSink.java:175)
at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSi nkTransformation(CommonExecSink.java:118)
at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translat eToPlanInternal(StreamExecSink.java:130)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(E xecNodeBase.java:134)
at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToP lan$1(StreamPlanner.scala:70)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(Strea mPlanner.scala:69)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.s cala:165)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvir onmentImpl.java:1518)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(Tabl eEnvironmentImpl.java:740)
at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:57 2)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl. java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(Metho dInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(Re flectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod (AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallComm and.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnecti on.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.errors.InvalidTxnSt ateException
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 32 more
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Pytho n process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl. java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgra m.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecu tion(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.jav a:1730)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(H adoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
... 16 more
</pre>
我的环境:jdk1.8.0_211 flink 1.13.2 centos 7.6
我的 jar 类路径:/opt/flink/lib flink-connector-kafka_2.12.1.13.2.jar flink-sql-connector-kafka_2.11.1.13.2.jar
我怀疑它在我的环境中找不到 kafka jar 库,而我使用 --jarfile 和 venv.zip 来声明,如何解决这个问题?非常感谢