我正在使用 spark 2.3 版并尝试将 spark 中的配置单元表读取为:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
df = spark.table("emp.emptable")
在这里,我将一个具有当前日期的新列从系统添加到现有数据框
import pyspark.sql.functions as F
newdf = df.withColumn('LOAD_DATE', F.current_date())
现在面临一个问题,当我尝试将此数据框写入配置单元表时
newdf.write.mode("overwrite").saveAsTable("emp.emptable")
pyspark.sql.utils.AnalysisException: u'Cannot overwrite table emp.emptable that is also being read from;'
所以我正在检查数据帧以打破血统,因为我正在从同一个数据帧读取和写入
checkpointDir = "/hdfs location/temp/tables/"
spark.sparkContext.setCheckpointDir(checkpointDir)
df = spark.table("emp.emptable").coalesce(1).checkpoint()
newdf = df.withColumn('LOAD_DATE', F.current_date())
newdf.write.mode("overwrite").saveAsTable("emp.emptable")
这样它就可以正常工作,并且新列已添加到配置单元表中。但是每次创建检查点文件时我都必须删除它。有没有最好的方法来打破血统并使用更新的列详细信息写入相同的数据帧并将其保存到 hdfs 位置或作为配置单元表。
或者有没有办法为检查点目录指定一个临时位置,在火花会话完成后将被删除。