0

我有一段代码基本上执行以下操作:

df=spark.read.parquet("very large dataset")
df.select(columns)
df.filter("some rows I dont want")

df2=df.groupBy('keys').agg("max of a column")
df=df.drop("columns that will be got from df2")
df=df.join(df2, on=["key cols"], "left")

spark.sparkContext.setCheckpointDir("checkpoint/path")
df3=df.checkpoint()

df4=df3.filter("condition 1").groupBy('key').agg("perform aggregations")
df5=df3.filter("condition 2").select(certain columns).alias(rename them)

df6=df4.join(df5, on=["key cols"], how="outer") #perform full outer join to get all columns and rows

此时,我收到以下错误:

已解决的属性 UL#28099 在运算符 !Project [ SEQ_ID#27907, TOOL_ID#27908, TIME_STAMP#27909, DATE#27910, RESULT#27911, UL#28099, cast(CASE WHEN isnull(LL#27913) THEN -Infinity ELSE LL#27913 END as double) AS LL#27246, UW#27914,LW#27915]。具有相同名称的属性出现在操作中:UL。请检查是否使用了正确的属性。;;\n加入 FullOuter\n

但是,当我删除checkpoint并像正常缓存的数据帧一样运行它时,它可以正常工作。如果我的数据集很小,这没问题,但我需要检查点,因为与可用的 EMR 资源相比,我的数据集非常大。

有没有人遇到过类似的问题?

4

0 回答 0