0

我正在尝试为 pyspark readStream 找到兼容版本的 jar。我探索了许多版本,但找不到兼容的 jar。如果我做错了什么,请告诉我。

我的系统配置和使用的 jar

OS: OSX
pyspark==3.1.2
JAVA==1.8
SPARK-KAFKA-CLIENT==kafka-clients-3.0.0.jar
spark-sql-kafka==spark-sql-kafka-0-10_2.12-3.1.2.jar

例外:我在 KafkaConfigUpdater 上遇到错误,不确定解决方案是什么。

21/10/05 20:08:22 ERROR MicroBatchExecution: Query qalerts [id = baabcb36-fde6-47fc-bd5f-2980224216e9, runId = 1ad1108a-f037-481a-b29d-5c69587cd4ae] terminated with error
java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
    at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:579)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:465)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$4(MicroBatchExecution.scala:104)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:97)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:318)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:318)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:171)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:169)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:323)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:408)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:406)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:359)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:323)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:171)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:169)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:307)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:82)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:62)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:326)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 33 more
Exception in thread "stream execution thread for qalerts [id = baabcb36-fde6-47fc-bd5f-2980224216e9, runId = 1ad1108a-f037-481a-b29d-5c69587cd4ae]" java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
    at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:579)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:465)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$4(MicroBatchExecution.scala:104)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:97)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:318)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:74)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:318)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:171)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:169)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:323)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:408)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:244)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:406)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:359)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:323)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:171)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:169)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:307)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:82)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:62)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:326)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 33 more 

示例代码

spark = SparkSession.builder.\
  config('spark.jars', '../jar/kafka-clients-3.0.0.jar,../jar/spark-sql-kafka-0-10_2.12-3.1.2.jar').\
  getOrCreate()
print(f'spark version: {spark.version}')
df = spark.readStream\
  .format("kafka")\
  .option("subscribe", TOPIC)\
  .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)\
  .option("kafka.sasl.mechanism", "PLAIN")\
  .option("kafka.security.protocol", "SASL_SSL")\
  .option("kafka.sasl.jaas.config", EH_SASL)\
  .option("kafka.request.timeout.ms", "60000")\
  .option("kafka.session.timeout.ms", "60000")\
  .option("failOnDataLoss", "true")\
  .option("startingOffsets", "earliest") \
  .option("partition", 1) \
  .option("kafka.group.id", "grp1") \
  .load()
ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
alertQuery = ds \
          .writeStream \
          .queryName("qalerts")\
          .format("memory")\
          .start()

alerts = spark.sql("select * from qalerts")
pdAlerts = alerts.toPandas()
a = pdAlerts['value'].tolist()

d = []
for i in a:
    x = json.loads(i)
    d.append(x)
print(d)
4

1 回答 1

0

你可以试试这个,

  • 创建一个全新的 python 虚拟环境并选择/获取它。

  • 在这个环境中安装 pyspark。假设您使用的是 3.1.2 版

  • 仅使用以下初始和简单代码

代码

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.master("local[2]") \
  .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
  .getOrCreate()


df = spark.readStream\
  .format("kafka")\
  .option("subscribe", TOPIC)\
  .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)\
  .option("kafka.sasl.mechanism", "PLAIN")\
  .option("kafka.security.protocol", "SASL_SSL")\
  .option("kafka.sasl.jaas.config", EH_SASL)\
  .option("kafka.request.timeout.ms", "60000")\
  .option("kafka.session.timeout.ms", "60000")\
  .option("failOnDataLoss", "true")\
  .option("startingOffsets", "earliest") \
  .option("partition", 1) \
  .option("kafka.group.id", "grp1") \
  .load()

ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

ds.writeStream \
  .trigger(processingTime='5 seconds') \
  .outputMode("update") \
  .format("console") \
  .option("truncate", "false") \
  .start()

spark.streams.awaitAnyTermination()

这应该下载正确的依赖项。如果出现错误,请共享堆栈跟踪。

于 2021-10-26T17:12:36.700 回答