我目前有数据简单的数据处理,涉及分组、合并和并行列到列操作。不那么简单的部分是使用的大量行(其详细的成本/财务数据)。它的大小为 300-400 GB。
由于 RAM 有限,目前我正在使用 dask 的核心计算。然而,它真的很慢。
我之前读过使用 CuDF 来提高 map_partitions 和 groupby 的性能,但是大多数示例都使用中高端 gpu(至少 1050ti,大多数在基于 gv 的云 vm 上运行)并且数据可以适合 gpu RAM。
我的机器规格是 E5-2620v3(6C/12T)、128gb 和 K620(只有 2gb 专用 vram)。
使用的中间数据帧存储在镶木地板中。
如果我使用低端 GPU 使用 CuDF,它会更快吗?是否可以在 GPU 中进行核心计算?(例如,我环顾四周,但还没有找到)
下面是我试图做的简化伪代码
a.csv 是大小约为 300gb 的数据,由 3 列(Hier1、Hier2、Hier3、值)组成,Hier1-3 是字符串中的层次结构。value 是销售价值 b.csv 是大小约为 50gb 的数据,由 3 列(Hier1、Hier2、valuetype、cost)组成。Hier1-2 是层次结构,在字符串中。值类型是成本类型,在字符串中。成本是成本价值
基本上,我需要根据 a.csv 中的销售价值为 b.csv 中的每个成本按比例自上而下。最后,我在 Hier3 级别(更详细的级别)中都有可用的每个成本
第一步是创建按比例分配的比率:
import dask.dataframe as dd
# read raw data, repartition, convert to parquet for both file
raw_reff = dd.read_csv('data/a.csv')
raw_reff = raw_reff.map_partitions(lambda df: df.assign(PartGroup=df['Hier1']+df['Hier2']))
raw_reff = raw_reff.set_index('PartGroup')
raw_reff.to_parquet("data/raw_a.parquet")
cost_reff = dd.read_csv('data/b.csv')
cost_reff = cost_reff.map_partitions(lambda df: df.assign(PartGroup=df['Hier1']+df['Hier2']))
cost_reff = cost_reff.set_index('PartGroup')
cost_reff.to_parquet("data/raw_b.parquet")
# create reference ratio
ratio_reff = dd.read_parquet("data/raw_a.parquet").reset_index()
#to push down ram usage, instead of dask groupby im using groupby on each partition. Should be ok since its already partitioned above on each group
ratio_reff = ratio_reff.map_partitions(lambda df: df.groupby(['PartGroup'])['value'].sum().reset_index())
ratio_reff = ratio_reff.set_index('PartGroup')
ratio_reff = ratio_reff.map_partitions(lambda df: df.rename(columns={'value':'value_on_group'}))
ratio_reff.to_parquet("data/reff_a.parquet")
然后进行合并以获得比率
raw_data = dd.read_parquet("data/raw_a.parquet").reset_index()
reff_data = dd.read_parquet("data/reff_a.parquet").reset_index()
ratio_data = raw_data.merge(reff_data, on=['PartGroup'], how='left')
ratio_data['RATIO'] = ratio_data['value'].fillna(0)/ratio_data['value_on_group'].fillna(0)
ratio_data = ratio_data[['PartGroup','Hier3','RATIO']]
ratio_data = ratio_data.set_index('PartGroup')
ratio_data.to_parquet("data/ratio_a.parquet")
然后将 PartGroup 上的成本数据合并并乘以 Ratio 以获得其按比例分配的值
reff_stg = dd.read_parquet("data/ratio_a.parquet").reset_index()
cost_stg = dd.read_parquet("data/raw_b.parquet").reset_index()
final_stg = reff_stg.merge(cost_stg, on=['PartGroup'], how='left')
final_stg['allocated_cost'] = final_stg['RATIO']*final_stg['cost']
final_stg = final_stg.set_index('PartGroup')
final_stg.to_parquet("data/result_pass1.parquet")
在实际情况下,由于缺少参考数据等会导致残值,并且会使用多个参考在几遍中完成,但基本上以上是步骤
即使严格执行 parquet 到 parquet 操作,我的 128gb 中仍然需要大约 80gb 的 RAM,我的所有核心都在 100% 运行,并且运行 3-4 天。我正在寻找使用当前硬件更快完成此任务的方法。如您所见,它的大规模并行问题符合基于 gpu 的处理的定义
谢谢