我有一个在 spark 2.4.8 hadoop 2.6 build 上运行的 pyspark 脚本。
脚本是一个简单的带有 kafka 的结构化流。从主题中读取、提取、过滤、映射和写入另一个 kafka 主题。减少测试看起来像这样。
from pyspark.sql.types import StructType,StructField,StringType
from pyspark.sql.functions import from_json,col,expr,to_json,struct,trim,explode
spark = SparkSession \
.builder \
.appName("myapp") \
.getOrCreate()
df_schema = StructType([
StructField("field_one", StructType([
StructField("inner_one", StringType(), True),
StructField("inner_two", StringType(), True)
]), True),
StructField("field_two", StringType(), True)
])
log = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "MY_KAFKA_BROKERS") \
.option("subscribe", "MY_TOPIC") \
.load() \
.selectExpr("CAST(value AS STRING) as val") \
.select(from_json("val",df_schema).alias("all")) \
.select(col("all.*"))
extract = log \
.filter("field_one.inner_one = 'some_value'") \
.select(to_json(struct("field_one","field_two")).alias("value"))
result = extract.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "MY_KAFKA_BROKERS") \
.option("topic", "MY_TOPIC_TWO") \
.outputMode("append") \
.option("checkpointLocation", "hdfs:///user/me/spark/checkpoints/spark-test") \
.trigger(processingTime='5 seconds') \
.start()
result.awaitTermination()
工作正常。
最近我设置了一个新的火花安装。没有 hadoop 分发的 Sprark 3.2.0 通过 SPARK_DIST_CLASSPATH 添加到 spark 的类路径中的 hadoop 2.6 包。
我正在连接到同一个 hadoop。
这就是问题所在。Spark 抛出与检查点位置相关的异常。
Caused by: org.apache.hadoop.HadoopIllegalArgumentException: Uri without authority: hdfs:/user/me/spark/checkpoints/spark-test
更奇怪的是,当我删除协议
.option("checkpointLocation", "hdfs:///user/me/spark/checkpoints/spark-test") \
时
.option("checkpointLocation", "/user/me/spark/checkpoints/spark-test") \
脚本运行,甚至它在提供的位置检查点到 hdfs。
我的问题是 - spark 2.4.x 和 3.2.x 之间的结构化流中的检查点有什么变化?如果是这样,我如何为检查点提供其他文件系统?还是只是我的自定义 spark 安装问题 - spark 3 和 classpath 上提供的 hadoop 2.6 包?