0

我在纱线上部署了我的 pyflink 作业,它包括 kafka 消耗。

我的运行命令:

/opt/flink/bin/flink run -m yarn-cluster -yid application_1634021687380_0009 --jarfile=/opt/flink/lib/flink-sql-connector-kafka_2.11-1.13.2.jar -pyarch venv.zip -pyexec venv.zip/venv/bin/python -py demo.py

当我提交我的工作时,它得到这个输出错误:</p>

  <pre>SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/flink/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/                                                                                                                                                     impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/hdp/3.1.0.0-78/hadoop/lib/slf4j-log4j12-1.7.25.ja                                                                                                                                                     r!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
2021-10-13 14:41:14,387 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                                                                                                                                                                     [] - Found Yarn properties file under /tmp/.yarn-properties-flink.
2021-10-13 14:41:14,387 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                                                                                                                                                                     [] - Found Yarn properties file under /tmp/.yarn-properties-flink.
PID 32298: initializing main ...
Traceback (most recent call last):
  File "/opt/demo.py", line 92, in <module>
    GenUniqueIPAssest()
  File "/opt/demo.py", line 84, in GenUniqueIPAssest
    source_table.select(
  File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table.py", line 1056, in execute                                                                                                                                                     _insert
  File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1285, in                                                                                                                                                      __call__
  File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco
  File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 326, in get_                                                                                                                                                     return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o64.executeInsert.
: java.lang.NoClassDefFoundError: org/apache/kafka/common/errors/InvalidTxnStateExceptio                                                                                                                                                     n
        at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.createKafk                                                                                                                                                     aProducer(KafkaDynamicSink.java:329)
        at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSink.getSinkRun                                                                                                                                                     timeProvider(KafkaDynamicSink.java:175)
        at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSi                                                                                                                                                     nkTransformation(CommonExecSink.java:118)
        at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translat                                                                                                                                                     eToPlanInternal(StreamExecSink.java:130)
        at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(E                                                                                                                                                     xecNodeBase.java:134)
        at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToP                                                                                                                                                     lan$1(StreamPlanner.scala:70)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(Strea                                                                                                                                                     mPlanner.scala:69)
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.s                                                                                                                                                     cala:165)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvir                                                                                                                                                     onmentImpl.java:1518)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(Tabl                                                                                                                                                     eEnvironmentImpl.java:740)
        at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:57                                                                                                                                                     2)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.                                                                                                                                                     java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(Metho                                                                                                                                                     dInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(Re                                                                                                                                                     flectionEngine.java:357)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod                                                                                                                                                     (AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallComm                                                                                                                                                     and.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnecti                                                                                                                                                     on.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.errors.InvalidTxnSt                                                                                                                                                     ateException
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 32 more

org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Pytho                                                                                                                                                     n process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.                                                                                                                                                     java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgra                                                                                                                                                     m.java:355)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecu                                                                                                                                                     tion(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.jav                                                                                                                                                     a:1730)
        at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(H                                                                                                                                                     adoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
        ... 16 more
</pre>

我的环境:jdk1.8.0_211 flink 1.13.2 centos 7.6

我的 jar 类路径:/opt/flink/lib flink-connector-kafka_2.12.1.13.2.jar flink-sql-connector-kafka_2.11.1.13.2.jar

我怀疑它在我的环境中找不到 kafka jar 库,而我使用 --jarfile 和 venv.zip 来声明,如何解决这个问题?非常感谢

4

0 回答 0