0

我有一个简单的 Apache Spark Structured Streaming python 代码,它从 Kafka 读取数据,并将消息写入控制台。

我已经设置了检查点位置,但是代码没有写入检查点..任何想法为什么?

这是代码:

from pyspark.sql import SparkSession, Window


spark = SparkSession.builder.appName('StructuredStreaming_KafkaProducer').getOrCreate()
# os.environ["SPARK_HOME"] = "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2"
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.13:3.2.0'

# kafkaBrokers='localhost:9092'
kafkaBrokers='<host>:<port>'
topic = "my-topic"
# bootstrap.servers=my-cluster-lb-ssl-cert-kafka-bootstrap:9093
security_protocol="SSL"
ssl_truststore_location="/Users/karanalang/Documents/Technology/strimzi/gcp_certs_nov28/ca.p12"
ssl_truststore_password="<pwd_1>"
ssl_keystore_location="/Users/karanalang/Documents/Technology/strimzi/gcp_certs_nov28/user.p12"
ssl_keystore_password="<pwd_2>"
consumerGroupId = "my-group"

spark.sparkContext.setLogLevel("ERROR")

df = spark.read.format('kafka')\
    .option("kafka.bootstrap.servers",kafkaBrokers)\
    .option("kafka.security.protocol","SSL") \
    .option("kafka.ssl.truststore.location",ssl_truststore_location) \
    .option("kafka.ssl.truststore.password",ssl_truststore_password) \
    .option("kafka.ssl.keystore.location", ssl_keystore_location)\
    .option("kafka.ssl.keystore.password", ssl_keystore_password)\
    .option("subscribe", topic) \
    .option("kafka.group.id", consumerGroupId)\
    .option("startingOffsets", "earliest") \
    .load()

query = df.selectExpr("CAST(value AS STRING)") \
    .write \
    .format("console") \
    .option("numRows",100)\
    .option("checkpointLocation", "~/PycharmProjects/Kafka/checkpoint/") \
    .option("outputMode", "complete")\
    .save("output")
4

1 回答 1

0

根据官方文档,此检查点位置必须是 HDFS 兼容文件系统中的路径,并且可以在启动查询时在 DataStreamWriter 中设置为选项。但是在您的代码中,它是驱动程序上的本地路径,因此在重新启动流式作业后,它将丢失。

详情请参考https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing

于 2022-02-08T00:55:04.873 回答