我有一个简单的 Apache Spark Structured Streaming python 代码,它从 Kafka 读取数据,并将消息写入控制台。
我已经设置了检查点位置,但是代码没有写入检查点..任何想法为什么?
这是代码:
from pyspark.sql import SparkSession, Window
spark = SparkSession.builder.appName('StructuredStreaming_KafkaProducer').getOrCreate()
# os.environ["SPARK_HOME"] = "/Users/karanalang/Documents/Technology/spark-3.2.0-bin-hadoop3.2"
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.13:3.2.0'
# kafkaBrokers='localhost:9092'
kafkaBrokers='<host>:<port>'
topic = "my-topic"
# bootstrap.servers=my-cluster-lb-ssl-cert-kafka-bootstrap:9093
security_protocol="SSL"
ssl_truststore_location="/Users/karanalang/Documents/Technology/strimzi/gcp_certs_nov28/ca.p12"
ssl_truststore_password="<pwd_1>"
ssl_keystore_location="/Users/karanalang/Documents/Technology/strimzi/gcp_certs_nov28/user.p12"
ssl_keystore_password="<pwd_2>"
consumerGroupId = "my-group"
spark.sparkContext.setLogLevel("ERROR")
df = spark.read.format('kafka')\
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("kafka.security.protocol","SSL") \
.option("kafka.ssl.truststore.location",ssl_truststore_location) \
.option("kafka.ssl.truststore.password",ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location)\
.option("kafka.ssl.keystore.password", ssl_keystore_password)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("startingOffsets", "earliest") \
.load()
query = df.selectExpr("CAST(value AS STRING)") \
.write \
.format("console") \
.option("numRows",100)\
.option("checkpointLocation", "~/PycharmProjects/Kafka/checkpoint/") \
.option("outputMode", "complete")\
.save("output")