我有一个长期运行的火花流工作。执行时间逐渐线性增加,在 60 分钟内处理时间从 40 秒增加到 90 秒。
这种增加发生在 HDFS 写入语句中:
def write_checkpoint(self, df, event_name, no_partition_save=None, no_partition_read=None, partition_by=None, cache=True):
hdfs_path = self.get_next_checkpoint_path(event_name) # rotate from the previous output
if no_partition_save:
# coalesce instead of repartition can have unwanted behaviour
# https://stackoverflow.com/questions/38961251/java-lang-outofmemoryerror-unable-to-acquire-100-bytes-of-memory-got-0
df.repartition(no_partition_save) \
.write \
.mode("overwrite") \
.save(hdfs_path)
elif partition_by:
df.write \
.partitionBy(partition_by) \
.mode("overwrite") \
.save(hdfs_path)
else:
df \
.write \
.mode("overwrite") \
.save(hdfs_path)
if no_partition_read:
df_new = self.spark.read.load(hdfs_path).repartition(no_partition_read)
else:
df_new = self.spark.read.load(hdfs_path)
if partition_by:
df_new = df.repartition(partition_by)
if cache:
df_new.cache()
return df_new
应用程序启动时,此保存操作需要 1-2 秒。
随着时间的推移,任务本身保持 2 秒(第一张图片,1 个完成的阶段,耗时 2 秒),但整个查询持续时间急剧增加(第二张图片,总时间 40 秒)。
我还在 python 中输入了一些日志记录,在同一操作中我可以看到瓶颈:
这可能是什么原因?