2

简单的问题:我读过的所有教程都向您展示了如何使用 ipython.parallel 或多处理将并行计算的结果输出到列表(或最好是字典)。

您能否指出一个使用任一库将计算结果输出到共享熊猫数据框的简单示例?

http://gouthamanbalaraman.com/blog/distributed-processing-pandas.html - 本教程向您展示如何读取输入数据帧(下面的代码),但是我将如何将 4 个并行计算的结果输出到一个数据帧?

import pandas as pd
import multiprocessing as mp

LARGE_FILE = "D:\\my_large_file.txt"
CHUNKSIZE = 100000 # processing 100,000 rows at a time

def process_frame(df):
        # process data frame
        return len(df)

if __name__ == '__main__':
        reader = pd.read_table(LARGE_FILE, chunksize=CHUNKSIZE)
        pool = mp.Pool(4) # use 4 processes

        funclist = []
        for df in reader:
                # process each data frame
                f = pool.apply_async(process_frame,[df])
                funclist.append(f)

        result = 0
        for f in funclist:
                result += f.get(timeout=10) # timeout in 10 seconds

        print "There are %d rows of data"%(result)
4

1 回答 1

2

您要求multiprocessing(或其他 python 并行模块)输出到它们不直接输出到的数据结构。如果你使用Pool任何并行包中的 , ,你最好得到一个列表(使用map)或迭代器(使用imap)。如果您使用共享内存 from multiprocessing,您可能能够将结果放入可以通过指针访问的内存块中ctypes

那么问题来了,你能将结果从迭代器或共享内存块中提取到pandas.DataFrame? 我认为答案是肯定的。是的你可以。但是,我认为我没有在教程中看到过这样做的简单示例……因为它不是那么简单。

迭代器路线似乎不太可能,因为您需要numpy先消化一个迭代器,而无需先将结果作为列表拉回 python。我会选择共享内存路线。我认为这应该给你一个输出到 aDataFrame然后你可以使用multiprocessing

from multiprocessing import sharedctypes as sh
from numpy import ctypeslib as ct        
import pandas as pd

ra = sh.RawArray('i', 4)
arr = ct.as_array(ra)
arr.shape = (2,2)
x = pd.DataFrame(arr)

然后您所要做的就是将句柄传递给数组multiprocessing.Process

import multiprocessing as mp
p1 = mp.Process(target=doit, args=(arr[:1, :], 1))
p2 = mp.Process(target=doit, args=(arr[1:, :], 2))
p1.start()
p2.start()
p1.join()
p2.join()

然后,通过一些指针魔术,结果应该填写在您的DataFrame .

我会让你编写doit函数来随心所欲地操作数组。

编辑:这看起来是一个使用类似方法的好答案……<a href="https://stackoverflow.com/a/22487898/2379433">https://stackoverflow.com/a/22487898/2379433。这似乎也有效:https ://stackoverflow.com/a/27027632/2379433 。

于 2015-05-22T11:21:49.990 回答