0

我有一个长期运行的火花流工作。执行时间逐渐线性增加,在 60 分钟内处理时间从 40 秒增加到 90 秒。

这种增加发生在 HDFS 写入语句中:

def write_checkpoint(self, df, event_name, no_partition_save=None, no_partition_read=None, partition_by=None, cache=True):

    hdfs_path = self.get_next_checkpoint_path(event_name)  # rotate from the previous output
    if no_partition_save:
        # coalesce instead of repartition can have unwanted behaviour
        # https://stackoverflow.com/questions/38961251/java-lang-outofmemoryerror-unable-to-acquire-100-bytes-of-memory-got-0
        df.repartition(no_partition_save) \
            .write \
            .mode("overwrite") \
            .save(hdfs_path)
    elif partition_by:
        df.write \
            .partitionBy(partition_by) \
            .mode("overwrite") \
            .save(hdfs_path)
    else:
        df \
            .write \
            .mode("overwrite") \
            .save(hdfs_path)
    
    if no_partition_read:
        df_new = self.spark.read.load(hdfs_path).repartition(no_partition_read)
    else:
        df_new = self.spark.read.load(hdfs_path)

    if partition_by:
        df_new = df.repartition(partition_by)
    if cache:
        df_new.cache()
    return df_new


应用程序启动时,此保存操作需要 1-2 秒。

随着时间的推移,任务本身保持 2 秒(第一张图片,1 个完成的阶段,耗时 2 秒),但整个查询持续时间急剧增加(第二张图片,总时间 40 秒)。

剩余 2 秒的任务

作业时间增加到 40 秒

我还在 python 中输入了一些日志记录,在同一操作中我可以看到瓶颈: 在此处输入图像描述

这可能是什么原因?

4

0 回答 0