2

我正在尝试屏蔽然后unique对一列应用操作。下面报告了我正在使用的代码的简化版本:

import numpy as np
import pandas as pd
import dask.dataframe as dd

data = np.random.randint(0,100,(1000,2))
ddf = dd.from_pandas(pd.DataFrame(data, columns = ['data','id']), npartitions = 2)

mask = ddf['data'] > 0
unique_false = ddf[~mask]['id'].unique()
unique_true = ddf[mask]['id'].unique()

results = dask.compute([unique_true, unique_false])

这个快速示例运行良好。我的真实数据由~5000列组成,其中一列用于过滤,一列用于获取唯一 ID。数据存储在200parquet 分区中,每个分区的权重为 9MB,但在加载到内存时 ( ddf.get_partition(0).compute().info()) 权重~5GB。鉴于我有400GBRAM,我会假设我可以在80分区周围加载(考虑到其他操作的开销可能会更少)。从仪表板上我可以看到 dask 正在尝试一次执行所有任务(在内存中任务总是相同的,不管有多少工人)。

我写这个是为了测试处理一个分区所花费的时间:

start = time.time()

df = ddf.get_partition(0).compute()

mask = df['data'] > 0

unique_true = df[mask]['id'].unique()
unique_false = df[~mask]['id'].unique()

print(time.time() - start)

它需要周围60s,它需要周围7GB的 RAM。如果我启动 ProcessPool 并假设我一次只运行50分区,则需要4-5几分钟。

我知道 Dask 的核心与我对单个分区所做的完全一样,所以我的问题是为什么 Dask 会尝试并行执行所有任务而不是一次执行一个任务?有没有办法限制任务执行?这是这里真正的问题还是我错过了什么?

我在这里发现了几个问题来限制任务执行。这里的所有要点:https ://distributed.dask.org/en/latest/resources.html 。但是,我相信我不应该强迫这种行为,让 Dask 尽力而为。我还应该提到,Dask 能够在单线程中设置 5 个工作人员时运行代码,每个工作人员具有 80GB 的 RAM(但这比我提到的进程池方法花费的时间要多得多)。

我在 python3.6.10和 Dask上2.17.2

4

0 回答 0