我正在处理一个非常大的数据集,其中包括 Spark 中的 200 个压缩 JSON 文件(每个 ~ 8G 未压缩)。我创建了一个主数据框largeDF
和几个额外的数据框来计算嵌套属性(它们是结构数组)上的聚合。我想执行一般统计计算(填充率和组数)。
对整个数据集的每次处理大约需要 20 分钟(加载文件、解压缩和执行聚合)。对于 50 个字段,它需要很长时间,因为每次我更改我的条件并一次又一次地运行带有附加过滤器的查询。
我想依靠 PySpark 的惰性求值,避免多次加载数据,所以我可以创建一个复杂的聚合并在整个数据集上应用一次,然后将所有结果转换为 Pandas。或者更好的是,如果我可以预先定义作业并要求 Spark 并行处理它们(加载一次,计算所有),然后分别返回每个作业的结果。
这些不是我的主要 ETL,但我正在尝试提取数据集的语义来编写实际的 ETL 管道。
计算 1:计算统计数据并找到所有字段的填充率:
stats = DF_large.describe().toPandas()
计算 2:使用分类数据处理简单字段:
def group_count(df, col, limit, sort, skip_null):
"""This function groups data-set on based on provided column[s], and counts each group."""
if skip_null:
df = df.where(df[col].isNotNull())
if limit:
df = df.limit(limit)
df = df.groupBy(col).count()
if sort:
df = df.sort(col, ascending=False)
return df.toPandas()
aggregations = {}
for col in group_count_list_of_columns:
aggregations[col] = group_count(largeDF, col, limit=0, skip_null=True, sort=False)
计算 3:计算和计算嵌套字段的填充率:
def get_nested_fields(spDf, col : str, limit, othercols : tuple, stats = True):
"""This function unwinds a nested array field out of data-set based on provided column, and either returns the whole or statistics of it."""
spDf = spDf.where(spDf[col].isNotNull())
df = spDf.select(F.explode(col), *othercols)
if limit:
df = df.limit(limit)
if stats:
res = df.describe().toPandas()
else:
res = df.toPandas()
return res
nested_fields_aggregate = {}
for col in nested_fields_lists:
nested_fields_aggregate[col] = get_nested_field(largeDF, col, limit=10**4, othercols =['name', 'id', 'timestamp'], stats = True)
这需要多次读取整个数据集。形状不一样,所以我不能加入。理论上应该有一种方法可以减少时间,因为没有一个计算是相互依赖的。