2

我正在尝试运行一个 zeppelin 笔记本,其中包含带有 Kafka 连接器的 spark 结构化流示例。

>kafka is up and running on localhost port 9092 

>from zeppelin notebook, sc.version returns String = 2.0.2

这是我的环境:

kafka: kafka_2.10-0.10.1.0

zeppelin: zeppelin-0.6.2-bin-all

spark: spark-2.0.2-bin-hadoop2.7

这是我的飞艇笔记本中的代码:

import org.apache.enter code herespark.sql.functions.{explode, split}


// Setup connection to Kafka val kafka = spark.readStream  
.format("kafka")   
.option("kafka.bootstrap.servers","localhost:9092")   
// comma separated list of broker:host  
.option("subscribe", "twitter")    
// comma separated list of topics 
.option("startingOffsets", "latest") 
// read data from the end of the stream   .load()

这是我运行笔记本时遇到的错误:

import org.apache.spark.sql.functions.{explode, split} java.lang.ClassNotFoundException:找不到数据源:kafka。请在https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects找到软件包 在 org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148) 在 org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79) 在 org .apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79) 在 org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218) 在 org.apache.spark .sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80) at org.apache.spark.sql .execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124) ... 86 被忽略 原因:java.lang.ClassNotFoundException : 卡夫卡。DefaultSource 在 scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:424) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:357)在 org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) 在 org.apache.spark.sql.execution.datasources.DataSource$$anonfun $5$$anonfun$apply$1.apply(DataSource.scala:132) 在 scala.util.Try$.apply(Try.scala:192)132)在 org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) 在 scala.util.Try$.apply(Try.scala:192 )132)在 org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) 在 scala.util.Try$.apply(Try.scala:192 )

任何帮助建议将不胜感激。

谢谢

4

1 回答 1

1

您可能已经想通了,但是为其他人提供答案,您必须将以下内容添加到 zeppelin-env.sh.j2

SPARK_SUBMIT_OPTIONS=--packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0

如果您使用的是 kafka 客户端,则可能还有其他依赖项:

--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0,org.apache.spark:spark-sql_2.11:2.1.0,org.apache.kafka:kafka_2.11:0.10.0.1,org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0,org.apache.kafka:kafka-clients:0.10.0.1
于 2017-02-24T18:50:02.497 回答