0

这是我在 Flink 中使用 Kafka 的简单 Scala 程序:

import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord} 
import org.apache.flink.streaming.api.environment._ 
import org.apache.flink.streaming.connectors.kafka 
import org.apache.flink.streaming.connectors.kafka.api._ 
import org.apache.flink.streaming.util.serialization._ 
import org.apache.flink.api.common.typeinfo._ 

object TestKafka { 
  def main(args: Array[String]) { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val stream = env 
      .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema)) 
      .addSink(new KafkaSink[String]("localhost:2181", "test", new JavaDefaultStringSchema)) 

    env.execute("Test Kafka") 
  } 
} 

在 build.sbt 中:

libraryDependencies ++= Seq("org.apache.flink" % "flink-scala" % "0.9.0", "org.apache.flink" % "flink-clients" % "0.9.0") 
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.2.1" 
libraryDependencies += "org.apache.flink" % "flink-streaming-scala" % "0.9.0" 
libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}") 
libraryDependencies += "com.101tec" % "zkclient" % "0.5" 

我正在使用“sbt assembly”来构建一个胖 jar,因此目标 jar 文件应该包含所有内容。但是,运行目标jar文件时会出现错误:

java.lang.NoClassDefFoundError: org/I0Itec/zkclient/serialize/ZkSerializer 
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:66) 
        at kafka.javaapi.consumer.ZookeeperConsumerConnector.<init>(ZookeeperConsumerConnector.scala:69) 
        at kafka.consumer.Consumer$.createJavaConsumerConnector(ConsumerConnector.scala:105) 
        at kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerConnector.scala) 
        at org.apache.flink.streaming.connectors.kafka.api.KafkaSource.initializeConnection(KafkaSource.java:175) 
        at org.apache.flink.streaming.connectors.kafka.api.KafkaSource.open(KafkaSource.java:207) 
        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:33) 
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:56) 
        at org.apache.flink.streaming.runtime.tasks.StreamTask.openOperator(StreamTask.java:158) 
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.invoke(SourceStreamTask.java:52) 
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) 
        at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassNotFoundException: org.I0Itec.zkclient.serialize.ZkSerializer 
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366) 
......... 

我已经把 zkclient-0.5.jar 放在 /lib 下。任何人都可以解释一下吗?

4

0 回答 0