我正在使用 Python 环境的 Pandas 模块制作基于内存的“大数据”实时计算模块。
所以响应时间是这个模块的质量,非常关键和重要。
为了处理大型数据集,我将数据拆分并并行处理子拆分数据。
在存储子数据结果的部分,花费了很多时间(第21行)。
我认为内部内存深拷贝出现或传递的子数据未在内存中共享。
如果我用 C 或 C++ 编写模块,我将使用指针或引用,如下所示。
“进程=进程(目标=addNewDerivedColumn,args=[resultList,&sub_dataframe ])”
或者
“进程=进程(目标=addNewDerivedColumn,args=[resultList,sub_dataframe])
def addNewDerivedColumn(resultList, split_sub_dataframe& ):...."
有没有避免内存深拷贝或减少多处理时间的好方法? “不优雅”很好。我已经准备好让我的代码变脏了。我尝试了 weekref、RawValue、RawArray、Value、Pool 但都失败了。
该模块正在 MacOS 中开发,最终将在 Linux 或 Unix 中运行。
不考虑 Windows 操作系统。
代码来了。
真实代码在我的办公室,但结构和逻辑与真实代码相同。
1 #-*- coding: UTF-8 -*-'
2 import pandas as pd
3 import numpy as np
4 from multiprocessing import *
5 import time
6
7
8 def addNewDerivedColumn(resultList, split_sub_dataframe):
9
10 split_sub_dataframe['new_column']= np.abs(split_sub_dataframe['column_01']+split_sub_dataframe['column_01']) / 2
11
12 print split_sub_dataframe.head()
13
14 '''
15 i think that the hole result of sub-dataframe is copied to resultList, not reference value
16 and in here time spend much
17 compare elapsed time of comment 21th line with the uncommented one
18 In MS Windows, signifiant difference of elapsed time doesn't show up
19 In Linux or Mac OS, the difference is big
20 '''
21 resultList.append(split_sub_dataframe)
22
23
24
25 if __name__ == "__main__":
26
27 # example data generation
28 # the record count of the real data is over 1 billion with about 10 columns.
29 dataframe = pd.DataFrame(np.random.randn(100000000, 4), columns=['column_01', 'column_02', 'column_03', 'column_04'])
30
31
32 print 'start...'
33 start_time = time.time()
34
35 # to launch 5 process in parallel, I split the dataframe to five sub-dataframes
36 split_dataframe_list = np.array_split(dataframe, 5)
37
38 # multiprocessing
39 manager = Manager()
40
41 # result list
42 resultList=manager.list()
43 processList=[]
44
45 for sub_dataframe in split_dataframe_list:
46 process=Process(target=addNewDerivedColumn, args=[resultList, sub_dataframe])
47 processList.append(process)
48
49 for proc in processList:
50 proc.start()
51 for proc in processList:
52 proc.join()
53
54
55 print 'elapsed time : ', np.round(time.time() - start_time,3)