1

我正在运行一个 spark 作业,其工作是扫描一个大文件并将其拆分为较小的文件。该文件采用 Json Lines 格式,我正在尝试按某个列(id)对其进行分区,并将每个分区作为单独的文件保存到 S3。文件大小约为 12 GB,但id大约有 500000 个不同的值。查询大约需要 15 个小时。我可以做些什么来提高性能?对于这样的任务,Spark 是不是一个糟糕的选择?请注意,我确实有权确保源为每个id的固定行数。

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from awsglue.utils import getResolvedOptions
from awsglue.transforms import *
from pyspark.sql.functions import udf, substring, instr, locate
from datetime import datetime, timedelta

    
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
# Get parameters that were passed to the job
args = getResolvedOptions(sys.argv, ['INPUT_FOLDER', 'OUTPUT_FOLDER', 'ID_TYPE', 'DATASET_DATE'])

id_type = args["ID_TYPE"]
output_folder = "{}/{}/{}".format(args["OUTPUT_FOLDER"], id_type, args["DATASET_DATE"])
input_folder = "{}/{}/{}".format(args["INPUT_FOLDER"], id_type, args["DATASET_DATE"])


INS_SCHEMA = StructType([
    StructField("camera_capture_timestamp", StringType(), True),
    StructField(id_type, StringType(), True),
    StructField("image_uri", StringType(), True)
])


data = spark.read.format("json").load(input_folder, schema=INS_SCHEMA)

data = data.withColumn("fnsku_1", F.col("fnsku"))

data.coalesce(1).write.partitionBy(["fnsku_1"]).mode('append').json(output_folder)   

我也尝试过重新分区而不是合并。

我正在使用 AWS Glue

4

2 回答 2

3

请考虑以下作为可能的选项之一。看看它是否有帮助会很棒:)

首先,如果你合并,正如评论中的@Lamanus 所说,这意味着你将减少分区的数量,因此也会减少写入任务的数量,从而将所有数据洗牌到 1 个任务。它可能是第一个需要改进的因素。

为了克服这个问题,即。每个分区写一个文件并保持并行化级别,您可以更改以下逻辑:

object TestSoAnswer extends App {

  private val testSparkSession = SparkSession.builder()
    .appName("Demo groupBy and partitionBy").master("local[*]")
    .getOrCreate()
  import testSparkSession.implicits._

  // Input dataset with 5 partitions
  val dataset = testSparkSession.sparkContext.parallelize(Seq(
    TestData("a", 0), TestData("a", 1), TestData("b", 0), TestData("b", 1),
    TestData("c", 1), TestData("c", 2)
  ), 5).toDF("letter", "number")

  dataset.as[TestData].groupByKey(row => row.letter)
    .flatMapGroups {
      case (_, values) => values
    }.write.partitionBy("letter").mode("append").json("/tmp/test-parallel-write")

}

case class TestData(letter: String, number: Int)

它是如何工作的?

首先,代码执行洗牌以将与特定键(与分区相同)相关的所有行收集到相同的分区。这样,它将一次对属于该键的所有行执行写入。前段时间我写了一篇关于partitionBy方法的博客文章。粗略地说,在内部它会对给定分区上的记录进行排序,然后将它们一一写入文件中。

这样我们就得到了这样的计划,其中只有 1 次随机播放,因此存在处理消耗操作:

== Physical Plan ==
*(2) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, TestData, true])).letter, true, false) AS letter#22, knownnotnull(assertnotnull(input[0, TestData, true])).number AS number#23]
+- MapGroups TestSoAnswer$$$Lambda$1236/295519299@55c50f52, value#18.toString, newInstance(class TestData), [value#18], [letter#3, number#4], obj#21: TestData
   +- *(1) Sort [value#18 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(value#18, 200), true, [id=#15]
         +- AppendColumnsWithObject TestSoAnswer$$$Lambda$1234/1747367695@6df11e91, [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, knownnotnull(assertnotnull(input[0, TestData, true])).letter, true, false) AS letter#3, knownnotnull(assertnotnull(input[0, TestData, true])).number AS number#4], [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#18]
            +- Scan[obj#2]

执行两次的输出TestSoAnswer如下所示:

test-parallel-write % ls
_SUCCESS letter=a letter=b letter=c
test-parallel-write % ls letter=a
part-00170-68245d8b-b155-40ca-9b5c-d9fb746ac76c.c000.json part-00170-cd90d64f-43c6-4582-aae6-fe443b6617f4.c000.json

test-parallel-write % ls letter=b
part-00161-68245d8b-b155-40ca-9b5c-d9fb746ac76c.c000.json part-00161-cd90d64f-43c6-4582-aae6-fe443b6617f4.c000.json

test-parallel-write % ls letter=c
part-00122-68245d8b-b155-40ca-9b5c-d9fb746ac76c.c000.json part-00122-cd90d64f-43c6-4582-aae6-fe443b6617f4.c000.json

您还可以使用此配置控制每个文件写入的记录数

编辑:没有看到@mazaneicha 的评论,但确实,你可以试试repartition("partitioning column")!比分组表达式还要清晰。

最好的,

巴托什。

于 2020-08-03T16:28:48.403 回答
0

如果除了将文件拆分为自身的较小版本之外,您不打算将 Spark 用于任何其他用途,那么我会说 Spark 是一个糟糕的选择。您最好在 AWS 中按照此 Stack Overflow 帖子中给出的方法执行此操作

假设您有一个可用的 EC2 实例,您将运行如下内容:

aws s3 cp s3://input_folder/12GB.json - | split -l 1000 - output.
aws s3 cp output.* s3://output_folder/

如果您希望在 Spark 中对数据进行进一步处理,您需要将数据重新分区为 128MB 和1 GB之间的块。使用默认(快速)压缩,您通常会得到原始文件大小的 20%。所以,在你的情况下:在 (12/5) ~3 和 (12/5/8) ~20 个分区之间,所以:

data = spark.read.format("json").load(input_folder, schema=INS_SCHEMA) 

dataPart = data.repartition(12)

对于 Spark 来说,这实际上并不是一个特别大的数据集,处理起来也不应该那么麻烦。

保存为 parquet 为您提供了一个很好的恢复点,并且重新读取数据会非常快。总文件大小约为 2.5 GB。

于 2020-08-03T20:54:12.363 回答