5

我将 zip 文件存储在 Amazon s3 中,然后我有一个 Python 列表["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"],我需要使用 Spark 集群解压缩所有这些文件,并将所有 CSV 文件存储到一个增量格式表中。我想知道比我目前的方法更快的处理方法:

1) 我有一个用于在我的 Python 列表中进行迭代的bucle 。

2) 我正在使用 Python Boto3 从 s3 获取 zip 文件s3.bucket.Object(file)

3)我正在使用下一个代码解压缩文件

import io
import boto3
import shutil
import zipfile
for file in ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"]:
    obj = s3.bucket.Object(file)
    with io.BytesIO(obj.get()["Body"].read()) as tf:
        tf.seek(0)
        with zipfile.ZipFile(tf, mode='r') as zipf:
            for subfile in zipf.namelist():
                zipf.extract(subfile, outputZip)
    dbutils.fs.cp("file:///databricks/driver/{0}".format(outputZip), "dbfs:" + outputZip, True)
    shutil.rmtree(outputZip)
    dbutils.fs.rm("dbfs:" + outputZip, True)

4)我的文件在驱动程序节点中解压缩,然后执行程序无法访问这些文件(我找不到这样做的方法)所以我将所有这些 csv 文件移动到 DBFS 使用dbutils.fs.cp()

5)我使用 Pyspark Dataframe 从 DBFS 读取所有 csv 文件,并将其写入 Delta 表

df = self.spark.read.option("header", "true").csv("dbfs:" + file) 
df.write.format("delta").save(path)

6) 我从 DBFS 和驱动节点中删除数据

因此,我当前的目标是在比我之前的过程更短的时间内将 zip 文件从 S3 摄取到 Delta 表中。我想我可以将其中一些过程并行化为 1) 步骤,我想避免复制到 DBFS 的步骤,因为我不需要在那里有数据,而且我需要在每次摄取后删除 CSV 文件到Delta Table 以避免驱动程序节点磁盘中的内存错误。有什么建议吗?

4

2 回答 2

1

您现在知道的 Zip 不是一种可拆分的压缩技术,没有与 zip 一起使用的内置编解码器。您可能会发现一些聪明的人已经编写了他们自己的 Spark zip 编解码器/数据源,但我还没有找到。

我最近收到的一些tips:

  1. aws cli 会将文件并行复制到本地节点(驱动程序或工作程序)。aws cp <src> <dst>运行比使用流式传输二进制文件快得多aws cp <src> -。aws cli /botodbutils.fs.cp()

  2. 使用for循环是非并行执行和低效率的一个好兆头。这将导致更低的吞吐量、更高的成本、更低的集群利用率。尽可能使用 Spark 和 Python 的功能。

  3. 您可以使用以下内容创建文件名的 Spark DataFrame:

df_files = spark.createDataFrame(dbutils.fs.ls("<s3 path to bucket and folder>"))

从此数据框中,您可以运行 pandas udf、.pipe()运算符、常规 UDF。

  1. 在一组工作人员中并行运行 spark.pipe()或 Pandas UDF,每个工作都在文件的路径上工作,让您获得更多的吞吐量,让您的集群保持忙碌。

  2. 尝试使用 PandasUDF 执行以下任一操作:复制-解压缩-复制回 s3,用 CSV 阅读器读取 b。复制-解压缩-读取-返回-in-a-pandas-udf。观察 Ganglia 指标以确保高利用率。

  3. 由于您正在写入 Delta 表,因此如果您必须有多个 spark 作业,则可以可靠地并行运行写入(追加)。[一份工作中的并行工人>>并行火花工作]

  4. 为您的 CSV 提供模式读取速度比读取速度快 2 倍.option("inferSchema","true")

于 2020-07-20T01:56:42.043 回答
1

好吧,多种可能的解决方案可能是:

  1. 您可以一起读取所有文件(如果架构允许),df=spark.read.csv("s3://mybucket") 并将数据帧写入为增量df.write.format("delta").save(path)
  2. 您可以在数据框中单独读取每个文件并直接附加到现有的增量表(即使它是空的),而无需将其存储在 DBFS 中。有关更多详细信息:https ://docs.databricks.com/delta/delta-batch.html#append-using-dataframes
  3. 您可以在数据框中单独读取每个文件并将其合并到现有的主数据框中。最后,您可以将主数据框编写为增量表。

选项 3 类似于:

    import io
    import boto3
    import shutil
    import zipfile
    from pyspark.sql import SparkSession

    spark = SparkSession.builder.appName("name").getOrCreate()

    schema = StructType([
    \\ YOUR DATA SCHMEA
    ])

    df = spark.createDataFrame([], schema)

    for file in ["s3://mybucket/file1.zip", ..., "s3://mybucket/fileN.zip"]:
        obj = s3.bucket.Object(file)
        with io.BytesIO(obj.get()["Body"].read()) as tf:
            tf.seek(0)
            with zipfile.ZipFile(tf, mode='r') as zipf:
                for subfile in zipf.namelist():
                    zipf.extract(subfile, outputZip)
        tempdf = spark.read.option("header", "true").csv(outputZip)
        df = df.union(tempdf)      

    df.write.format("delta").save(path)
于 2019-11-08T15:46:47.260 回答