0

我正在尝试从 spark 流应用程序中读取 kafka 流数据;在读取数据的过程中,我遇到了以下异常:

16/12/24 11:09:05 INFO storage.BlockManagerMaster: Registered BlockManager

Exception in thread "main" java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder
    at com.inndata.RSVPSDataStreaming.KafkaToSparkStreaming.main(KafkaToSparkStreaming.java:69)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 10 more

这是我的版本信息:

火花:1.6.2

卡夫卡:0.8.2

这是 pom.xml:

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.8.2.1</version>
</dependency>
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>1.6.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8-assembly_2.10</artifactId>
    <version>2.0.0-preview</version>
</dependency>
4

2 回答 2

0

您使用不兼容和重复的工件版本。请记住,使用 Spark 时:

  • 所有 Scala 工件都必须使用相同的主要 Scala 版本(2.10、2.11)。
  • 所有 Spark 工件都必须使用相同的主要 Spark 版本(1.6、2.0)。

在构建定义中,您将spark-streaming1.6 与spark-core2.0 混合在一起,并为 Scala 2.10 包含重复项spark-streaming-kafka,而其余依赖项针对 Scala 2.11。

于 2016-12-24T11:26:39.377 回答
0

似乎需要隐式字符串编码器尝试应用这个

import org.apache.spark.sql.Encoder
    implicit val stringpEncoder = org.apache.spark.sql.Encoders.kryo[String]

你可以在这里找到更多关于编码器的信息,也可以从这里的官方文档中找到

于 2016-12-24T09:16:30.203 回答