0

我有一个 AWS Glue 工作,其工作非常简单:将大型 CSV gzip 文件分成 1GB 的文件。

在我的测试中,我将 4 个文件上传到存储桶中,每个文件大约 5GB。然而,作业总是将所有文件分配给单个工作人员,而不是分配给所有工作人员。

活动工作者日志:

[Executor task launch worker for task 3] s3n.S3NativeFileSystem (S3NativeFileSystem.java:open(1323)): Opening 's3://input/IN-4.gz' for reading
[Executor task launch worker for task 0] s3n.S3NativeFileSystem (S3NativeFileSystem.java:open(1323)): Opening 's3://input/IN-1.gz' for reading
[Executor task launch worker for task 2] s3n.S3NativeFileSystem (S3NativeFileSystem.java:open(1323)): Opening 's3://input/IN-3.gz' for reading
[Executor task launch worker for task 1] s3n.S3NativeFileSystem (S3NativeFileSystem.java:open(1323)): Opening 's3://input/IN-2.gz' for reading
[Executor task launch worker for task 0] zlib.ZlibFactory (ZlibFactory.java:<clinit>(49)): Successfully loaded & initialized native-zlib librar

一名休息工人日志:

storage.BlockManager (Logging.scala:logInfo(54)): Initialized BlockManager: BlockManagerId(3, 172.31.0.109, 35849, None)

其余工作人员卡在这一步,无休止地等待,并将所有 20GB 文件分配给单个活动任务

它的作业脚本如下:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "crawled-database", table_name = "input", transformation_ctx = "datasource0", additional_options = {"groupFiles": "inPartition", "compressionType": "gzip"})

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [ ("tagids", "string", "internal_tagids", "string"), ("channel", "long", "internal_channel", "long")], transformation_ctx = "applymapping1")

datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://glue-report-staging", "groupFiles": "inPartition", "groupSize": "1073741824", "compression": "gzip"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
4

1 回答 1

0

文件上使用的压缩格式是性能和并行度偏差的原因。Gzipped 文件不可拆分。这就是为什么所有工作都分配给一个执行者的原因。如果您在纯 Spark 中阅读此内容,则可以将其作为 rdd 阅读并重新分区到较小的任务/分区。

动态框架也有相同的选项。您可以使用datasource0.repartition(n), n作为所需的分区数。对于 G.1X,每个工作人员可以有 8 个并行任务,因此您可以根据必须最大化并行性的工作人员来调整其大小。您可以在此处找到更多信息

于 2021-09-02T20:18:03.373 回答