0

我为 SparkContext 添加了检查点,并为长期 Spark 结构化流作业的 kafka 数据流编写查询。

spark.sparkContext.setCheckpointDir("/tmp/checkpoint")

...

val monitoring_stream = monitoring_df.writeStream
                              .trigger(Trigger.ProcessingTime("120 seconds"))
                              .option("checkpointLocation", "s3a://spark-checkpoint/checkpointfiles")
                             .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
                                if(!batchDF.isEmpty) 
                                {
                                }
                             .start()
                             .awaitTermination()

spark作业运行稳定。但是,我注意到检查点文件在 HDFS 和 S3 中累积,没有自动清理。我看到存储空间不断被这些文件吃掉。是否有某种方法可以配置这些检查点文件的保留时间以使其自动删除?还是我需要运行一些 cron 作业来手动删除它们?如果我手动删除它们,是否会影响正在进行的 Spark 作业?谢谢!

4

1 回答 1

2

spark.cleaner.referenceTracking.cleanCheckpoints需要设置为true,默认为false。

于 2020-09-27T17:21:07.173 回答