我正在运行一个 spark 作业,其工作是扫描一个大文件并将其拆分为较小的文件。该文件采用 Json Lines 格式,我正在尝试按某个列(id)对其进行分区,并将每个分区作为单独的文件保存到 S3。文件大小约为 12 GB,但id大约有 500000 个不同的值。查询大约需要 15 个小时。我可以做些什么来提高性能?对于这样的任务,Spark 是不是一个糟糕的选择?请注意,我确实有权确保源为每个id的固定行数。
import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from awsglue.utils import getResolvedOptions
from awsglue.transforms import *
from pyspark.sql.functions import udf, substring, instr, locate
from datetime import datetime, timedelta
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# Get parameters that were passed to the job
args = getResolvedOptions(sys.argv, ['INPUT_FOLDER', 'OUTPUT_FOLDER', 'ID_TYPE', 'DATASET_DATE'])
id_type = args["ID_TYPE"]
output_folder = "{}/{}/{}".format(args["OUTPUT_FOLDER"], id_type, args["DATASET_DATE"])
input_folder = "{}/{}/{}".format(args["INPUT_FOLDER"], id_type, args["DATASET_DATE"])
INS_SCHEMA = StructType([
StructField("camera_capture_timestamp", StringType(), True),
StructField(id_type, StringType(), True),
StructField("image_uri", StringType(), True)
])
data = spark.read.format("json").load(input_folder, schema=INS_SCHEMA)
data = data.withColumn("fnsku_1", F.col("fnsku"))
data.coalesce(1).write.partitionBy(["fnsku_1"]).mode('append').json(output_folder)
我也尝试过重新分区而不是合并。
我正在使用 AWS Glue