6

我正在尝试将 Spark 用于一个非常简单的用例:给定大量文件(90k),其中包含数百万个设备的设备时间序列数据,将给定设备的所有时间序列读取分组到一组文件中(分割)。现在假设我们的目标是 100 个分区,给定的设备数据显示在同一个输出文件中并不重要,只是同一个分区。

鉴于这个问题,我们提出了两种方法来做到这一点 - repartitionthenwritewritewithpartitionBy应用于Writer. 其中任何一个的代码都非常简单:

repartition(添加了哈希列以确保与partitionBy以下代码的比较是一对一的):


df = spark.read.format("xml") \
  .options(rowTag="DeviceData") \
  .load(file_path, schema=meter_data) \
  .withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
  .repartition("partition") \
  .write.format("json") \
  .option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
  .mode("overwrite") \
  .save(output_path)

partitionBy


df = spark.read.format("xml") \
  .options(rowTag="DeviceData") \
  .load(file_path, schema=meter_data) \
  .withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
  .write.format("json") \
  .partitionBy(“partition”) \
  .option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
  .mode("overwrite") \
  .save(output_path)

在我们的测试repartition中比partitionBy. 为什么是这样?

根据我的理解repartition,我的 Spark 学习告诉我要尽可能避免这种洗牌。另一方面,partitionBy(根据我的理解)只对每个节点产生本地排序操作 - 不需要洗牌。我是否误解了一些让我认为partitionBy会更快的东西?

4

2 回答 2

9

TLDR:Spark 在您调用时触发排序partitionBy,而不是哈希重新分区。这就是为什么在您的情况下它要慢得多。

我们可以通过一个玩具示例来检查:

spark.range(1000).withColumn("partition", 'id % 100)
    .repartition('partition).write.csv("/tmp/test.csv")

DAG 1

不要注意灰色阶段,它被跳过,因为它是在以前的工作中计算的。

然后,与partitionBy

spark.range(1000).withColumn("partition", 'id % 100)
    .write.partitionBy("partition").csv("/tmp/test2.csv")

DAG2

您可以检查是否可以添加repartitionbefore partitionBy,排序仍然存在。那么发生了什么?请注意,第二个 DAG 中的排序不会触发洗牌。它是一个地图分区。事实上,当您调用 时partitionBy,spark 不会像人们一开始所期望的那样对数据进行洗牌。Spark 单独对每个分区进行排序,然后每个执行程序将其数据写入相应分区的单独文件中。因此,请注意,与partitionBy您一起写的不是num_partitions文件,而是文件之间num_partitions的东西num_partitions * num_executors。每个分区的每个执行程序都有一个文件,其中包含属于该分区的数据。

于 2021-11-15T08:33:29.457 回答
1

我认为@Oli 在他对主要答案的评论中完美地解释了这个问题。我只想加上我的 2 美分并尝试解释一下。

假设当您读取 XML 文件 [90K 文件] 时,spark 会将其读取到N个分区中。这是根据spark.sql.files.maxPartitionBytes文件格式压缩类型等因素的数量决定的。

让我们假设它是10K分区。这发生在代码的以下部分。

df = spark.read.format("xml") \
  .options(rowTag="DeviceData") \
  .load(file_path, schema=meter_data) \

假设您使用的是num_partitions = 100,您将添加一个名为partition的新列,其值为 0-99。Spark 只是向现有数据帧 [或 rdd] 添加一个新列,该数据帧被拆分为 10K 分区。

.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \

到目前为止,两个代码都是相同的。

现在,让我们比较一下 repartition v/s partitionBy 发生了什么

案例一:重新分区

.repartition("partition") \
.write.format("json") \

在这里,您将根据具有 100 个不同值的“分区”列对现有数据框进行重新分区。因此,现有数据帧将导致完全洗牌,将分区数量从10K 减少到 100。这个阶段将是计算量很大的,因为涉及到一个完整的洗牌。如果一个特定分区的大小真的很大[倾斜分区],这也可能会失败。

但这里的优势在于,在写入发生的下一阶段,Spark 只需将100 个文件写入output_path。每个文件将仅具有与“分区”列的一个值对应的数据

案例2:partitionBy

.write.format("json") \
.partitionBy("partition") \

在这里,您要求 spark 将现有数据帧写入output_path由列"partition"的不同值分区。您无处可要求 spark 减少数据帧的现有分区数。

所以 spark 会在output_path里面创建新的文件夹, 并在里面写入每个分区对应的数据。

output_path + "\partition=0\"
output_path + "\partition=1\"
output_path + "\partition=99\"

现在,由于您在现有数据帧上有 10K 个 spark 分区,并假设这10K个分区中的每一个都具有“partition”列的所有不同值,Spark 将不得不写入10K * 100 = 1M 个文件。即,所有 10K 分区中的一部分将被写入由“partition”列创建的所有 100 个文件夹。这样,spark 将通过在其中创建子目录将1M文件写入output_path 。优点是我们使用这种方法跳过了一个完整的洗牌。

现在与案例 1中的内存计算密集型 shuffle 相比,这将慢得多,因为 Spark 必须创建 1M 文件并将它们写入持久存储。最初也是一个临时文件夹,然后是output_path

如果写入发生在 AWS S3 或 GCP Blob 等对象存储上,这将慢得多

案例三:coalesce + partitionBy

.coalesce(num_partitions) \
.write.format("json") \
.partitionBy("partition") \

在这种情况下,您将使用coalesce()将 spark 分区的数量从 10K 减少到 100,并将其写入按“partition”列分区的output_path

因此,假设这 100 个分区中的每一个都具有列"partition"的所有不同值的最坏情况,spark 将不得不写入100 * 100 = 10K 个文件。

这仍将比案例 2快,但会比案例 1慢。这是因为您正在使用coalesce()进行部分洗牌,但最终仍将10K 文件写入output_path

案例4:repartition+partitionBy

.repartition("partition") \
.write.format("json") \
.partitionBy("partition") \

在这种情况下,您将使用repartition()将 spark 分区的数量从 10K 减少到 100 [列"partition"的不同值],并将其写入按列"partition"分区的output_path

因此,这 100 个分区中的每一个只有一个不同的列“分区”值,spark 将不得不写入100 * 1 = 100 个文件。partitionBy()创建的每个子文件夹里面只有一个文件。

这将花费与案例 1相同的时间,因为这两个案例都涉及完全洗牌,然后写入100 个文件。这里唯一的区别是 100 个文件将位于output_path下的子文件夹中。

在通过 spark 或 hive 读取 output_path 时,此设置对于过滤器的谓词下推很有用。

结论:

尽管partitionByrepartition快,但取决于数据帧分区的数量和这些分区内的数据分布,仅使用 partitionBy 可能最终代价高昂。

于 2021-12-02T11:03:25.487 回答