0

我目前有数据简单的数据处理,涉及分组、合并和并行列到列操作。不那么简单的部分是使用的大量行(其详细的成本/财务数据)。它的大小为 300-400 GB。

由于 RAM 有限,目前我正在使用 dask 的核心计算。然而,它真的很慢。

我之前读过使用 CuDF 来提高 map_partitions 和 groupby 的性能,但是大多数示例都使用中高端 gpu(至少 1050ti,大多数在基于 gv 的云 vm 上运行)并且数据可以适合 gpu RAM。

我的机器规格是 E5-2620v3(6C/12T)、128gb 和 K620(只有 2gb 专用 vram)。

使用的中间数据帧存储在镶木地板中。

如果我使用低端 GPU 使用 CuDF,它会更快吗?是否可以在 GPU 中进行核心计算?(例如,我环顾四周,但还没有找到)

下面是我试图做的简化伪代码

a.csv 是大小约为 300gb 的数据,由 3 列(Hier1、Hier2、Hier3、值)组成,Hier1-3 是字符串中的层次结构。value 是销售价值 b.csv 是大小约为 50gb 的数据,由 3 列(Hier1、Hier2、valuetype、cost)组成。Hier1-2 是层次结构,在字符串中。值类型是成本类型,在字符串中。成本是成本价值

基本上,我需要根据 a.csv 中的销售价值为 b.csv 中的每个成本按比例自上而下。最后,我在 Hier3 级别(更详细的级别)中都有可用的每个成本

第一步是创建按比例分配的比率:

import dask.dataframe as dd
# read raw data, repartition, convert to parquet for both file
raw_reff = dd.read_csv('data/a.csv')
raw_reff = raw_reff.map_partitions(lambda df: df.assign(PartGroup=df['Hier1']+df['Hier2']))
raw_reff = raw_reff.set_index('PartGroup')
raw_reff.to_parquet("data/raw_a.parquet")

cost_reff = dd.read_csv('data/b.csv')
cost_reff = cost_reff.map_partitions(lambda df: df.assign(PartGroup=df['Hier1']+df['Hier2']))
cost_reff = cost_reff.set_index('PartGroup')
cost_reff.to_parquet("data/raw_b.parquet")

# create reference ratio
ratio_reff = dd.read_parquet("data/raw_a.parquet").reset_index()

#to push down ram usage, instead of dask groupby im using groupby on each partition. Should be ok since its already partitioned above on each group

ratio_reff = ratio_reff.map_partitions(lambda df: df.groupby(['PartGroup'])['value'].sum().reset_index())
ratio_reff = ratio_reff.set_index('PartGroup')
ratio_reff = ratio_reff.map_partitions(lambda df: df.rename(columns={'value':'value_on_group'}))
ratio_reff.to_parquet("data/reff_a.parquet")

然后进行合并以获得比率

raw_data = dd.read_parquet("data/raw_a.parquet").reset_index()
reff_data = dd.read_parquet("data/reff_a.parquet").reset_index()
ratio_data = raw_data.merge(reff_data, on=['PartGroup'], how='left')
ratio_data['RATIO'] = ratio_data['value'].fillna(0)/ratio_data['value_on_group'].fillna(0)
ratio_data = ratio_data[['PartGroup','Hier3','RATIO']]
ratio_data = ratio_data.set_index('PartGroup')
ratio_data.to_parquet("data/ratio_a.parquet")

然后将 PartGroup 上的成本数据合并并乘以 Ratio 以获得其按比例分配的值

reff_stg = dd.read_parquet("data/ratio_a.parquet").reset_index()
cost_stg = dd.read_parquet("data/raw_b.parquet").reset_index()
final_stg = reff_stg.merge(cost_stg, on=['PartGroup'], how='left')
final_stg['allocated_cost'] = final_stg['RATIO']*final_stg['cost']
final_stg = final_stg.set_index('PartGroup')
final_stg.to_parquet("data/result_pass1.parquet")

在实际情况下,由于缺少参考数据等会导致残值,并且会使用多个参考在几遍中完成,但基本上以上是步骤

即使严格执行 parquet 到 parquet 操作,我的 128gb 中仍然需要大约 80gb 的 RAM,我的所有核心都在 100% 运行,并且运行 3-4 天。我正在寻找使用当前硬件更快完成此任务的方法。如您所见,它的大规模并行问题符合基于 gpu 的处理的定义

谢谢

4

1 回答 1

1

@Ditto,不幸的是,您当前的硬件无法做到这一点。您的 K620 具有 Kepler 架构 GPU,并且低于 RAPIDS 的最低要求。您将需要一张 Pascal 卡或更好的卡来运行 RAPIDS。好消息是,如果购买兼容 RAPIDS 的视频卡不是一个可行的选择,那么有许多廉价的云配置选项。老实说,你要做什么,我想要一点额外的 GPU 处理速度,并建议使用多 GPU 设置。

对于比 GPU RAM 更大的数据集,您可以使用 dask_cudf 来处理您的数据集。我们的文档和笔记本中有几个示例。请注意,在 dask.compute() 之后生成的数据集需要能够放入 GPU RAM。

https://rapidsai.github.io/projects/cudf/en/0.12.0/10min.html#10-Minutes-to-cuDF-and-Dask-cuDF

https://rapidsai.github.io/projects/cudf/en/0.12.0/dask-cudf.html#multi-gpu-with-dask-cudf

一旦你可以得到一个工作的、兼容 RAPID 的、多 GPU 设置并使用 dask_cudf,你应该得到一个非常值得的加速,特别是对于这种规模的数据探索。

希望这可以帮助!

于 2020-02-20T21:43:33.537 回答