3

我有 30TB 的数据按日期和小时划分,每小时分成 300 个文件。我进行了一些数据转换,然后希望数据按排序顺序排序并保存,以便 C++ 程序轻松摄取。我了解当您进行序列化时,排序仅在文件中是正确的。我希望通过更好地划分数据来规避这个问题。

我想同时按 sessionID 和时间戳排序。我不希望 sessionID 在不同文件之间拆分。如果我在 SessionID 上进行分区,我将有太多,所以我做一个模 N 来生成 N 个桶,旨在获得 1 个大约 100-200MB 的数据桶:

df = df.withColumn("bucket", F.abs(F.col("sessionId")) % F.lit(50))

然后我按日期、时间和存储桶遣返,然后再排序

df = df.repartition(50,"dt","hr","bucket")
df = df.sortWithinPartitions("sessionId","timestamp")
df.write.option("compression","gzip").partitionBy("dt","hr","bucket").parquet(SAVE_PATH)

这会将数据保存到 dt/hr/bucket,每个存储桶中有 1 个文件,但排序丢失。如果我不创建存储桶和重新分区,那么我最终会得到 200 个文件,数据是有序的,但是 sessionIds 被拆分到多个文件中。

编辑:问题似乎出在使用 保存时partitionBy("dt","hr","bucket"),它随机重新分区数据,因此不再排序。如果我没有保存,partitionBy那么我得到的正是我所期望的 - N 个存储桶/分区的 N 个文件和 sessionIds 跨越一个文件,所有文件都正确排序。所以我有一个非火花黑客手动迭代所有日期+小时目录

如果您按列分区,排序,然后使用 partitionBy 使用同一列写入,那么您希望直接转储已排序的分区,而不是对数据进行一些随机重新洗牌,这似乎是一个错误。

4

1 回答 1

0

将分区列放在已排序的列列表中可能会起到作用。

此处的完整描述 - https://stackoverflow.com/a/59161488/3061686

于 2021-06-20T17:36:49.783 回答