3

我有一个 AWS 粘合作业 (PySpark),需要从大小为 350GB+ 的集中式数据湖加载数据,准备并加载到由两列分区的 s3 存储桶中。我注意到加载和写入一周的数据需要很长时间(甚至大约一天)。有几个月的数据需要写入。我尝试增加工作节点,但似乎无法解决问题。

我的胶水作业目前有 60 个 G.1x 工作节点。

我在代码中的 SparkConf 看起来像这样

conf = pyspark.SparkConf().setAll([

        ("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2"),

        ("spark.speculation", "false"),

        ("spark.sql.parquet.enableVectorizedReader", "false"),

        ("spark.sql.parquet.mergeSchema", "true"),

        ("spark.sql.crossJoin.enabled", "true"),

        ("spark.sql.sources.partitionOverwriteMode","dynamic"),

        ("spark.hadoop.fs.s3.maxRetries", "20"),

        ("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false")

    ])


我相信它确实成功地将文件写入分区,但是删除它创建的所有临时 spark-staging 文件需要很长时间。当我检查任务时,这似乎需要大部分时间。

2021-04-22 03:08:50,558 INFO [Thread-6] s3n.S3NativeFileSystem (S3NativeFileSystem.java:rename(1355)): rename s3://<bucket-name>/etl/sessions/.spark-staging-8df58afd-d6b2-4ca0-8611-429125abe2ae/p_date=2020-12-16/geo=u1hm s3://<bucket-name>/etl/sessions/p_date=2020-12-16/geo=u1hm

我对 S3 的写入看起来像这样

finalDF.coalesce(50).write.partitionBy('p_date','geohash').save("s3://{bucket}/{basepath}/{table}/".format(bucket=args['DataBucket'], basepath='etl',

                                                         table='sessions'), format='parquet', mode="overwrite")

任何帮助都会得到帮助。

4

0 回答 0