我正在尝试为 pyspark readStream 找到兼容版本的 jar。我探索了许多版本,但找不到兼容的 jar。如果我做错了什么,请告诉我。
我的系统配置和使用的 jar
OS: OSX
pyspark==3.1.2
JAVA==1.8
SPARK-KAFKA-CLIENT==kafka-clients-3.0.0.jar
spark-sql-kafka==spark-sql-kafka-0-10_2.12-3.1.2.jar
例外:我在 KafkaConfigUpdater 上遇到错误,不确定解决方案是什么。
21/10/05 20:08:22 ERROR MicroBatchExecution: Query qalerts [id = baabcb36-fde6-47fc-bd5f-2980224216e9, runId = 1ad1108a-f037-481a-b29d-5c69587cd4ae] terminated with error
java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:579)
at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:465)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$4(MicroBatchExecution.scala:104)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:97)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:82)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:318)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:318)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:171)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:169)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:323)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:408)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:406)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:359)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:323)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:171)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:169)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:307)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:82)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:62)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:326)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 33 more
Exception in thread "stream execution thread for qalerts [id = baabcb36-fde6-47fc-bd5f-2980224216e9, runId = 1ad1108a-f037-481a-b29d-5c69587cd4ae]" java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:579)
at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:465)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$4(MicroBatchExecution.scala:104)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:97)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:82)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:318)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:318)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:171)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:169)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:323)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:408)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:406)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:359)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:323)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:171)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:169)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:307)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:82)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:62)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:326)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 33 more
示例代码
spark = SparkSession.builder.\
config('spark.jars', '../jar/kafka-clients-3.0.0.jar,../jar/spark-sql-kafka-0-10_2.12-3.1.2.jar').\
getOrCreate()
print(f'spark version: {spark.version}')
df = spark.readStream\
.format("kafka")\
.option("subscribe", TOPIC)\
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)\
.option("kafka.sasl.mechanism", "PLAIN")\
.option("kafka.security.protocol", "SASL_SSL")\
.option("kafka.sasl.jaas.config", EH_SASL)\
.option("kafka.request.timeout.ms", "60000")\
.option("kafka.session.timeout.ms", "60000")\
.option("failOnDataLoss", "true")\
.option("startingOffsets", "earliest") \
.option("partition", 1) \
.option("kafka.group.id", "grp1") \
.load()
ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
alertQuery = ds \
.writeStream \
.queryName("qalerts")\
.format("memory")\
.start()
alerts = spark.sql("select * from qalerts")
pdAlerts = alerts.toPandas()
a = pdAlerts['value'].tolist()
d = []
for i in a:
x = json.loads(i)
d.append(x)
print(d)