7

我正在使用 Python 环境的 Pandas 模块制作基于内存的“大数据”实时计算模块。

所以响应时间是这个模块的质量,非常关键和重要。

为了处理大型数据集,我将数据拆分并并行处理子拆分数据。

在存储子数据结果的部分,花费了很多时间(第21行)。

我认为内部内存深拷贝出现或传递的子数据未在内存中共享。

如果我用 C 或 C++ 编写模块,我将使用指针或引用,如下所示。

“进程=进程(目标=addNewDerivedColumn,args=[resultList,&sub_dataframe ])”

或者

“进程=进程(目标=addNewDerivedColumn,args=[resultList,sub_dataframe])

def addNewDerivedColumn(resultList, split_sub_dataframe& ):...."

有没有避免内存深拷贝或减少多处理时间的好方法? “不优雅”很好。我已经准备好让我的代码变脏了。我尝试了 weekref、RawValue、RawArray、Value、Pool 但都失败了。

该模块正在 MacOS 中开发,最终将在 Linux 或 Unix 中运行。

不考虑 Windows 操作系统。

代码来了。

真实代码在我的办公室,但结构和逻辑与真实代码相同。

1 #-*- coding: UTF-8 -*-' 
2 import pandas as pd
3 import numpy as np
4 from multiprocessing import *
5 import time
6
7
8 def addNewDerivedColumn(resultList, split_sub_dataframe):
9    
10    split_sub_dataframe['new_column']=    np.abs(split_sub_dataframe['column_01']+split_sub_dataframe['column_01']) / 2
11    
12    print split_sub_dataframe.head()
13    
14    '''
15     i think that the hole result of sub-dataframe is copied to resultList, not reference value 
16     and in here time spend much
17     compare elapsed time of comment 21th line with the uncommented one
18     In MS Windows, signifiant difference of elapsed time doesn't show up
19     In Linux or Mac OS, the difference is big
20    '''
21    resultList.append(split_sub_dataframe)
22    
23
24
25 if __name__ == "__main__":
26    
27    # example data generation
28    # the record count of the real data is over 1 billion with about 10 columns.
29    dataframe = pd.DataFrame(np.random.randn(100000000, 4), columns=['column_01', 'column_02', 'column_03', 'column_04'])
30    
31
32    print 'start...'
33    start_time = time.time()
34    
35    # to launch 5 process in parallel, I split the dataframe to five sub-dataframes
36    split_dataframe_list = np.array_split(dataframe, 5)
37    
38    # multiprocessing 
39    manager = Manager()
40    
41    # result list
42    resultList=manager.list()
43    processList=[]
44    
45    for sub_dataframe in split_dataframe_list:
46        process=Process(target=addNewDerivedColumn, args=[resultList, sub_dataframe])
47        processList.append(process)
48        
49    for proc in processList: 
50        proc.start()
51    for proc in processList: 
52        proc.join()
53    
54    
55    print 'elapsed time  : ', np.round(time.time() - start_time,3)
4

2 回答 2

9

如果您将进程间通信保持在最低限度,您将获得更好的性能。因此,与其传递子数据帧作为参数,不如传递索引值。子进程可以对公共 DataFrame 本身进行切片。

当一个子进程被生成时,它会得到一个在父进程的调用模块中定义的所有全局变量的副本。因此,如果在生成多处理池之前df在全局变量中定义了大型 DataFrame ,那么每个生成的子进程都可以访问.df

在没有 的 Windows 上,fork()将启动一个新的 python 进程并导入调用模块。因此,在 Windows 上,产生的子进程必须df从头开始重新生成,这可能需要时间和更多的内存。

但是,在 Linux 上,您有写时复制。这意味着生成的子进程访问(调用模块的)原始全局变量而不复制它们。只有当子进程尝试修改全局时,Linux 才会在修改值之前制作单独的副本。

因此,如果您避免在子流程中修改全局变量,您可以获得性能提升。我建议仅将子进程用于计算。返回计算的值,让主进程整理结果修改原始DataFrame。

import pandas as pd
import numpy as np
import multiprocessing as mp
import time

def compute(start, end):
    sub = df.iloc[start:end]
    return start, end, np.abs(sub['column_01']+sub['column_01']) / 2

def collate(retval):
    start, end, arr = retval
    df.ix[start:end, 'new_column'] = arr

def window(seq, n=2):
    """
    Returns a sliding window (of width n) over data from the sequence
    s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...
    """
    for i in range(len(seq)-n+1):
        yield tuple(seq[i:i+n])

if __name__ == "__main__":
    result = []
    # the record count of the real data is over 1 billion with about 10 columns.
    N = 10**3
    df = pd.DataFrame(np.random.randn(N, 4),
                      columns=['column_01', 'column_02', 'column_03', 'column_04'])

    pool = mp.Pool()    
    df['new_column'] = np.empty(N, dtype='float')

    start_time = time.time()
    idx = np.linspace(0, N, 5+1).astype('int')
    for start, end in window(idx, 2):
        # print(start, end)
        pool.apply_async(compute, args=[start, end], callback=collate)

    pool.close()
    pool.join()
    print 'elapsed time  : ', np.round(time.time() - start_time,3)
    print(df.head())
于 2013-11-10T07:43:58.683 回答
1

受这个问题和@unutbu 回答的启发,我在github上写了一个并行版本的 map 。该函数适用于多核单机无限并行处理只读大数据结构。基本思想类似于@unutbu 建议的,使用临时全局变量来保存大数据结构(例如,数据框),并将其“名称”而不是变量本身传递给工作人员。但是所有这些都封装在一个 map 函数中,因此它几乎是标准 map 函数的一个替代品,在 pathos 包的帮助下。示例用法如下,

# Suppose we process a big dataframe with millions of rows.
size = 10**9
df = pd.DataFrame(np.random.randn(size, 4),
                  columns=['column_01', 'column_02', 
                           'column_03', 'column_04'])
# divide df into sections of 10000 rows; each section will be
# processed by one worker at a time
section_size = 10000
sections = [xrange(start, start+section_size) 
            for start in xrange(0, size, section_size)]

# The worker function that processes one section of the
# df. The key assumption is that a child 
# process does NOT modify the dataframe, but do some 
# analysis or aggregation and return some result.
def func(section, df):
    return some_processing(df.iloc[section])

num_cores = 4
# sections (local_args) specify which parts of a big object to be processed;
# global_arg holds the big object to be processed to avoid unnecessary copy;
# results are a list of objects each of which is the processing results 
# of one part of a big object (i.e., one element in the iterable sections) 
# in order.
results = map(func, sections, global_arg=df,
              chunksize=10, 
              processes=num_cores)

# reduce results (assume it is a list of data frames)
result = pd.concat(results)

在我的一些文本挖掘任务中,将 df 直接传递给工作函数的幼稚并行实现甚至比单线程版本还要慢,因为大数据帧的复制操作代价高昂。然而,上述实现可以为具有 4 核的任务提供 3 倍以上的加速,这似乎非常接近真正的轻量级多线程。

于 2014-12-29T03:28:09.460 回答