2

我正在使用 spark 2.3 版并尝试将 spark 中的配置单元表读取为:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
df = spark.table("emp.emptable")

在这里,我将一个具有当前日期的新列从系统添加到现有数据框

import pyspark.sql.functions as F
newdf = df.withColumn('LOAD_DATE', F.current_date())

现在面临一个问题,当我尝试将此数据框写入配置单元表时

newdf.write.mode("overwrite").saveAsTable("emp.emptable")

pyspark.sql.utils.AnalysisException: u'Cannot overwrite table emp.emptable that is also being read from;'

所以我正在检查数据帧以打破血统,因为我正在从同一个数据帧读取和写入

checkpointDir = "/hdfs location/temp/tables/"
spark.sparkContext.setCheckpointDir(checkpointDir)
df = spark.table("emp.emptable").coalesce(1).checkpoint()
newdf = df.withColumn('LOAD_DATE', F.current_date())
newdf.write.mode("overwrite").saveAsTable("emp.emptable")

这样它就可以正常工作,并且新列已添加到配置单元表中。但是每次创建检查点文件时我都必须删除它。有没有最好的方法来打破血统并使用更新的列详细信息写入相同的数据帧并将其保存到 hdfs 位置或作为配置单元表。

或者有没有办法为检查点目录指定一个临时位置,在火花会话完成后将被删除。

4

1 回答 1

2

正如我们在这篇文章中所讨论的,设置以下属性是可行的。

spark.conf.set("spark.cleaner.referenceTracking.cleanCheckpoints", "true")

这个问题有不同的背景。我们想保留checkpointed数据集,所以不关心添加清理解决方案。

设置上述属性有时会起作用(经过测试的 scala、java 和 python),但很难依赖它。官方文件说,通过设置这个属性,Controls whether to clean checkpoint files if the reference is out of scope.我不知道它到底意味着什么,因为我的理解是,一旦 spark 会话/上下文停止,它应该清理它。如果有人可以照亮它,那就太好了。

关于

有没有最好的方法来打破血统

检查这个问题,@BiS 找到了一些使用方法来切割血统的createDataFrame(RDD, Schema)方法。虽然我自己没有测试过。

仅供参考,我通常不依赖上述属性并删除checkpointed代码本身中的目录以确保安全。

我们可以得到如下checkpointed目录:

斯卡拉:

//Set directory
scala> spark.sparkContext.setCheckpointDir("hdfs:///tmp/checkpoint/")

scala> spark.sparkContext.getCheckpointDir.get
res3: String = hdfs://<name-node:port>/tmp/checkpoint/625034b3-c6f1-4ab2-9524-e48dfde589c3

//It gives String so we can use org.apache.hadoop.fs to delete path 

PySpark:

// Set directory
>>> spark.sparkContext.setCheckpointDir('hdfs:///tmp/checkpoint')
>>> t = sc._jsc.sc().getCheckpointDir().get()
>>> t 
u'hdfs://<name-node:port>/tmp/checkpoint/dc99b595-f8fa-4a08-a109-23643e2325ca'

# notice 'u' at the start which means It returns unicode object use str(t)
# Below are the steps to get hadoop file system object and delete

>>> fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True

>>> fs.delete(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
True

>>> fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path(str(t)))
False

于 2019-09-09T21:28:43.627 回答