我将 zip 文件存储在 Amazon s3 中,然后我有一个 Python 列表["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"]
,我需要使用 Spark 集群解压缩所有这些文件,并将所有 CSV 文件存储到一个增量格式表中。我想知道比我目前的方法更快的处理方法:
1) 我有一个用于在我的 Python 列表中进行迭代的bucle 。
2) 我正在使用 Python Boto3 从 s3 获取 zip 文件s3.bucket.Object(file)
3)我正在使用下一个代码解压缩文件
import io
import boto3
import shutil
import zipfile
for file in ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"]:
obj = s3.bucket.Object(file)
with io.BytesIO(obj.get()["Body"].read()) as tf:
tf.seek(0)
with zipfile.ZipFile(tf, mode='r') as zipf:
for subfile in zipf.namelist():
zipf.extract(subfile, outputZip)
dbutils.fs.cp("file:///databricks/driver/{0}".format(outputZip), "dbfs:" + outputZip, True)
shutil.rmtree(outputZip)
dbutils.fs.rm("dbfs:" + outputZip, True)
4)我的文件在驱动程序节点中解压缩,然后执行程序无法访问这些文件(我找不到这样做的方法)所以我将所有这些 csv 文件移动到 DBFS 使用dbutils.fs.cp()
5)我使用 Pyspark Dataframe 从 DBFS 读取所有 csv 文件,并将其写入 Delta 表
df = self.spark.read.option("header", "true").csv("dbfs:" + file)
df.write.format("delta").save(path)
6) 我从 DBFS 和驱动节点中删除数据
因此,我当前的目标是在比我之前的过程更短的时间内将 zip 文件从 S3 摄取到 Delta 表中。我想我可以将其中一些过程并行化为 1) 步骤,我想避免复制到 DBFS 的步骤,因为我不需要在那里有数据,而且我需要在每次摄取后删除 CSV 文件到Delta Table 以避免驱动程序节点磁盘中的内存错误。有什么建议吗?