1

我有一个配置单元格式和快速压缩的镶木地板文件。它适合内存,pandas.info 提供以下数据。

parquet 文件中每组的行数仅为 100K

>>> df.info()
<class 'pandas.core.frame.DataFrame'>
Index: 21547746 entries, YyO+tlZtAXYXoZhNr3Vg3+dfVQvrBVGO8j1mfqe4ZHc= to oE4y2wK5E7OR8zyrCHeW02uTeI6wTwT4QTApEVBNEdM=
Data columns (total 8 columns):
payment_method_id         int16
payment_plan_days         int16
plan_list_price           int16
actual_amount_paid        int16
is_auto_renew             bool
transaction_date          datetime64[ns]
membership_expire_date    datetime64[ns]
is_cancel                 bool
dtypes: bool(2), datetime64[ns](2), int16(4)
memory usage: 698.7+ MB

现在,用 dask 进行一些简单的计算,我得到以下时间

使用线程

>>>time.asctime();ddf.actual_amount_paid.mean().compute();time.asctime()
'Fri Oct 13 23:44:50 2017'
141.98732048354384
'Fri Oct 13 23:44:59 2017'

使用分布式(本地集群)

>>> c=Client()
>>> time.asctime();ddf.actual_amount_paid.mean().compute();time.asctime()
'Fri Oct 13 23:47:04 2017'
141.98732048354384
'Fri Oct 13 23:47:15 2017'
>>> 

没关系,每个大约 9 秒。

现在使用多处理,惊喜来了……

>>> time.asctime();ddf.actual_amount_paid.mean().compute(get=dask.multiprocessing.get);time.asctime()
'Fri Oct 13 23:50:43 2017'
141.98732048354384
'Fri Oct 13 23:57:49 2017'
>>> 

我希望多处理和分布式/本地集群处于同一数量级,可能与线程有一些差异(无论好坏)

但是,多处理需要 47 倍的时间才能在 in16 列上生成简单的平均值?

我的环境只是一个带有所需模块的全新 conda 安装。没有任何东西的精心挑选。

为什么会有这种差异?我无法管理 dask/distributed 以使其具有可预测的行为,以便能够根据问题的性质在不同的调度程序之间做出明智的选择。

这只是一个玩具示例,但我无法找到符合我期望的示例(至少我对阅读文档的理解)。

有什么我应该记住的吗?还是我完全错过了重点?

谢谢

JC

4

1 回答 1

1

使用线程调度程序,每个任务都可以访问进程的所有内存——在这种情况下是所有数据——因此可以在没有任何内存复制的情况下进行计算。

使用分布式调度程序,调度程序知道哪个线程和哪个工作人员正在生成后续任务所需的数据,或者内存中已经有该数据。调度器的巧妙之处在于将计算转移给正确的工作人员,以避免数据通信和复制。

相反,多进程调度器倾向于向主进程发送任务结果和从主进程发送任务结果,这可能涉及大量的序列化和复制。有些任务可以融合在一起(通过调用链中的许多 python 函数来组合任务),但有些不能。任何序列化和复制都需要 CPU 工作,而且对您来说可能更重要的是内存空间。如果您的原始数据占系统总数的很大一部分,则您可能正在填满物理内存,从而导致很大的因素减速。

于 2017-10-17T19:30:55.693 回答