3

这是我正在做的事情的摘要:

起初,我通过普通的多处理和熊猫包来做到这一点:

步骤 1. 获取我要阅读的文件名列表

import os    
files = os.listdir(DATA_PATH + product)

步骤 2. 遍历列表

from multiprocessing import Pool
import pandas as pd    

def readAndWriteCsvFiles(file):
    ### Step 2.1 read csv file into dataframe 
    data = pd.read_csv(DATA_PATH + product + "/" + file, parse_dates=True, infer_datetime_format=False)

    ### Step 2.2 do some calculation
    ### .......

    ### Step 2.3 write the dataframe to csv to another folder
    data.to_csv("another folder/"+file)

if __name__ == '__main__':
    cl = Pool(4)
    cl.map(readAndWriteCsvFiles, files, chunksize=1)
    cl.close()
    cl.join()  

代码工作正常,但速度很慢。

完成该任务大约需要 1000 秒。

library(parallel)与使用和parSapply功能的 R 程序进行比较。

R 程序只需要大约 160 秒。

然后我尝试使用以下代码使用 dask.delayed 和 dask.dataframe:

步骤 1. 获取我要阅读的文件名列表

import os    
files = os.listdir(DATA_PATH + product)

步骤 2. 遍历列表

from dask.delayed import delayed
import dask.dataframe as dd
from dask import compute

def readAndWriteCsvFiles(file):
    ### Step 2.1 read csv file into dataframe 
    data = dd.read_csv(DATA_PATH + product + "/" + file, parse_dates=True, infer_datetime_format=False, assume_missing=True)

    ### Step 2.2 do some calculation
    ### .......

    ### Step 2.3 write the dataframe to csv to another folder
    data.to_csv(filename="another folder/*", name_function=lambda x: file)

compute([delayed(readAndWriteCsvFiles)(file) for file in files])

这一次,我发现如果我在 dask 代码和 pandas 代码中同时注释掉第 2.3 步,dask 的运行速度会比普通的 pandas 和多处理快得多。

但是如果我调用 to_csv 方法,那么 dask 和 pandas 一样慢。

有什么解决办法吗?

谢谢

4

1 回答 1

2

读取和写入 CSV 文件通常受 GIL 约束。您可能想尝试使用进程而不是线程进行并行化(dask 延迟的默认设置)。

您可以通过将scheduler='processes'关键字添加到计算调用来实现此目的。

compute([delayed(readAndWriteCsvFiles)(file) for file in files], scheduler='processes')

有关更多信息,请参阅调度文档

另外,请注意,您在这里使用的不是 dask.dataframe,而是 dask.delayed。

于 2018-09-15T10:23:21.117 回答