2

我想我了解重新分区、配置单元分区和分桶如何影响输出文件的数量,但我不太清楚各种功能的交互。有人可以帮助填写以下每种情况下我留空的输出文件的数量吗?目的是了解正确的代码适用于以下情况:我需要对高基数列和低基数列进行分区/分桶,在这种情况下我有频繁的操作来过滤低基数列,然后加入高基数列。

假设我们有一个数据框df,它以 200 个输入分区开始,colA有 10 个唯一值,并且colB有 1000 个唯一值。

首先检查我的理解:

  • df.repartition(100)= 100 个相同大小的输出文件

  • df.repartition('colA')= 10 个不同大小的输出文件,因为每个文件将包含 1 个 colA 值的所有行

  • df.repartition('colB')= 1000 个输出文件

  • df.repartition(50, 'colA')= 50 个输出文件?

  • df.repartition(50, 'colB')= 50 个输出文件,所以有些文件会包含多个 colB 的值?

Hive 分区:

  • output.write_dataframe(df, partition_cols=['colA'])= 1,000 个输出文件(因为我在 10 个配置单元分区 10 中的每个分区中可能有 100 个文件)

  • output.write_dataframe(df, partition_cols=['colB'])= 10,000 个输出文件

  • output.write_dataframe(df, partition_cols=['colA', 'colB'])= 100,000 个输出文件

  • output.write_dataframe(df.repartition('colA'), partition_cols=['colA'])= 10个不同大小的输出文件(每个hive分区1个文件)

分桶:

  • output.write_dataframe(df, bucket_cols=[‘colB’], bucket_count=100)= 100 个输出文件?在一个实验中,情况似乎并非如此

  • output.write_dataframe(df, bucket_cols=[‘colA’], bucket_count=10)= 10 个输出文件?

  • output.write_dataframe(df.repartition(‘colA’), bucket_cols=[‘colA’], bucket_count=10)= ???

现在都在一起了:

  • output.write_dataframe(df, partition_cols=[‘colA’], bucket_cols=[‘colB’], bucket_count=200)= ???

  • output.write_dataframe(df.repartition(‘colA’, ‘colB’), partition_cols=[‘colA’], bucket_cols=[‘colB’], bucket_count=200)= ???-- 这是我最后要使用的命令吗?任何下游都会首先过滤 colA 以利用 hive 分区,然后加入 colB 以利用分桶?

4

1 回答 1

3

对于 hive 分区 + 分桶,输出文件的 # 不是恒定的,将取决于输入分区的实际数据。为了澄清,假设 df 是 200 个分区,而不是 200 个文件。输出文件随输入分区数而不是文件数缩放。200 个文件可能会产生误导,因为这可能是 1 个分区到 1000 个分区。

首先检查我的理解:

df.repartition(100)= 100 个相同大小的输出文件

df.repartition('colA')= 10 个不同大小的输出文件,因为每个文件将包含 1 个 colA 值的所有行

df.repartition('colB')= 1000 个输出文件

df.repartition(50, 'colA')= 50 个输出文件

df.repartition(50, 'colB')= 50 个输出文件

Hive 分区:

output.write_dataframe(df, partition_cols=['colA'])= 2,000 个输出文件的上限(200 个输入分区 * 每个分区最多 10 个值)

output.write_dataframe(df, partition_cols=['colB'])= 最多 200,000 个输出文件(每个分区 200 * 1000 个值)

output.write_dataframe(df, partition_cols=['colA', 'colB'])= 最多 2,000,000 个输出文件(200 个分区 * 10 个值 * 1000)

output.write_dataframe(df.repartition('colA'), partition_cols=['colA'])= 10个不同大小的输出文件(每个hive分区1个文件)

分桶:

output.write_dataframe(df, bucket_cols=[‘colB’], bucket_count=100)= 最多 20,000 个文件(200 个分区 * 每个分区最多 100 个桶)

output.write_dataframe(df, bucket_cols=[‘colA’], bucket_count=10)= 最多 2,000 个文件(200 个分区 * 每个分区最多 10 个存储桶)

output.write_dataframe(df.repartition(‘colA’), bucket_cols=[‘colA’], bucket_count=10)= 正好 10 个文件(重新分区的数据集有 10 个输入分区,每个分区只输出到 1 个桶)

现在都在一起了:

output.write_dataframe(df, partition_cols=[‘colA’], bucket_cols=[‘colB’], bucket_count=200)= 我可能错了,但我相信它最多有 400,000 个输出文件(200 个输入分区 * 10 个 colA 分区 * 200 个 colB 存储桶)

output.write_dataframe(df.repartition(‘colA’, ‘colB’), partition_cols=[‘colA’], bucket_cols=[‘colB’], bucket_count=200)= 我相信这正好是 10,000 个输出文件(重新分区 colA,colB = 10,000 个分区,每个分区正好包含 1 个 colA 和 1 个 colB 桶)

于 2020-09-21T17:11:04.717 回答