这是我正在做的事情的摘要:
起初,我通过普通的多处理和熊猫包来做到这一点:
步骤 1. 获取我要阅读的文件名列表
import os
files = os.listdir(DATA_PATH + product)
步骤 2. 遍历列表
from multiprocessing import Pool
import pandas as pd
def readAndWriteCsvFiles(file):
### Step 2.1 read csv file into dataframe
data = pd.read_csv(DATA_PATH + product + "/" + file, parse_dates=True, infer_datetime_format=False)
### Step 2.2 do some calculation
### .......
### Step 2.3 write the dataframe to csv to another folder
data.to_csv("another folder/"+file)
if __name__ == '__main__':
cl = Pool(4)
cl.map(readAndWriteCsvFiles, files, chunksize=1)
cl.close()
cl.join()
代码工作正常,但速度很慢。
完成该任务大约需要 1000 秒。
library(parallel)
与使用和parSapply
功能的 R 程序进行比较。
R 程序只需要大约 160 秒。
然后我尝试使用以下代码使用 dask.delayed 和 dask.dataframe:
步骤 1. 获取我要阅读的文件名列表
import os
files = os.listdir(DATA_PATH + product)
步骤 2. 遍历列表
from dask.delayed import delayed
import dask.dataframe as dd
from dask import compute
def readAndWriteCsvFiles(file):
### Step 2.1 read csv file into dataframe
data = dd.read_csv(DATA_PATH + product + "/" + file, parse_dates=True, infer_datetime_format=False, assume_missing=True)
### Step 2.2 do some calculation
### .......
### Step 2.3 write the dataframe to csv to another folder
data.to_csv(filename="another folder/*", name_function=lambda x: file)
compute([delayed(readAndWriteCsvFiles)(file) for file in files])
这一次,我发现如果我在 dask 代码和 pandas 代码中同时注释掉第 2.3 步,dask 的运行速度会比普通的 pandas 和多处理快得多。
但是如果我调用 to_csv 方法,那么 dask 和 pandas 一样慢。
有什么解决办法吗?
谢谢