我正在尝试屏蔽然后unique
对一列应用操作。下面报告了我正在使用的代码的简化版本:
import numpy as np
import pandas as pd
import dask.dataframe as dd
data = np.random.randint(0,100,(1000,2))
ddf = dd.from_pandas(pd.DataFrame(data, columns = ['data','id']), npartitions = 2)
mask = ddf['data'] > 0
unique_false = ddf[~mask]['id'].unique()
unique_true = ddf[mask]['id'].unique()
results = dask.compute([unique_true, unique_false])
这个快速示例运行良好。我的真实数据由~5000
列组成,其中一列用于过滤,一列用于获取唯一 ID。数据存储在200
parquet 分区中,每个分区的权重为 9MB,但在加载到内存时 ( ddf.get_partition(0).compute().info()
) 权重~5GB
。鉴于我有400GB
RAM,我会假设我可以在80
分区周围加载(考虑到其他操作的开销可能会更少)。从仪表板上我可以看到 dask 正在尝试一次执行所有任务(在内存中任务总是相同的,不管有多少工人)。
我写这个是为了测试处理一个分区所花费的时间:
start = time.time()
df = ddf.get_partition(0).compute()
mask = df['data'] > 0
unique_true = df[mask]['id'].unique()
unique_false = df[~mask]['id'].unique()
print(time.time() - start)
它需要周围60s
,它需要周围7GB
的 RAM。如果我启动 ProcessPool 并假设我一次只运行50
分区,则需要4-5
几分钟。
我知道 Dask 的核心与我对单个分区所做的完全一样,所以我的问题是为什么 Dask 会尝试并行执行所有任务而不是一次执行一个任务?有没有办法限制任务执行?这是这里真正的问题还是我错过了什么?
我在这里发现了几个问题来限制任务执行。这里的所有要点:https ://distributed.dask.org/en/latest/resources.html 。但是,我相信我不应该强迫这种行为,让 Dask 尽力而为。我还应该提到,Dask 能够在单线程中设置 5 个工作人员时运行代码,每个工作人员具有 80GB 的 RAM(但这比我提到的进程池方法花费的时间要多得多)。
我在 python3.6.10
和 Dask上2.17.2
。