0

我有一个在 spark 2.4.8 hadoop 2.6 build 上运行的 pyspark 脚本。

脚本是一个简单的带有 kafka 的结构化流。从主题中读取、提取、过滤、映射和写入另一个 kafka 主题。减少测试看起来像这样。

from pyspark.sql.types import StructType,StructField,StringType
from pyspark.sql.functions import from_json,col,expr,to_json,struct,trim,explode

spark = SparkSession \
    .builder \
    .appName("myapp") \
    .getOrCreate()

df_schema = StructType([
    StructField("field_one", StructType([
        StructField("inner_one", StringType(), True),
        StructField("inner_two", StringType(), True)
    ]), True),
    StructField("field_two", StringType(), True)
])

log = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "MY_KAFKA_BROKERS") \
    .option("subscribe", "MY_TOPIC") \
    .load() \
    .selectExpr("CAST(value AS STRING) as val") \
    .select(from_json("val",df_schema).alias("all")) \
    .select(col("all.*"))

extract = log \
    .filter("field_one.inner_one = 'some_value'") \
    .select(to_json(struct("field_one","field_two")).alias("value"))


result = extract.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "MY_KAFKA_BROKERS") \
    .option("topic", "MY_TOPIC_TWO") \
    .outputMode("append") \
    .option("checkpointLocation", "hdfs:///user/me/spark/checkpoints/spark-test") \
    .trigger(processingTime='5 seconds') \
    .start()

result.awaitTermination()

工作正常。

最近我设置了一个新的火花安装。没有 hadoop 分发的 Sprark 3.2.0 通过 SPARK_DIST_CLASSPATH 添加到 spark 的类路径中的 hadoop 2.6 包。

我正在连接到同一个 hadoop。

这就是问题所在。Spark 抛出与检查点位置相关的异常。

Caused by: org.apache.hadoop.HadoopIllegalArgumentException: Uri without authority: hdfs:/user/me/spark/checkpoints/spark-test

更奇怪的是,当我删除协议 .option("checkpointLocation", "hdfs:///user/me/spark/checkpoints/spark-test") \.option("checkpointLocation", "/user/me/spark/checkpoints/spark-test") \

脚本运行,甚至它在提供的位置检查点到 hdfs。

我的问题是 - spark 2.4.x 和 3.2.x 之间的结构化流中的检查点有什么变化?如果是这样,我如何为检查点提供其他文件系统?还是只是我的自定义 spark 安装问题 - spark 3 和 classpath 上提供的 hadoop 2.6 包?

4

0 回答 0