1

我正在尝试实现类似 df.apply 的函数,但在数据帧的块中并行化。我编写了以下测试代码,看看我能获得多少(相对于数据复制等):

from multiprocessing import Pool
from functools import partial
import pandas as pd
import numpy as np
import time

def df_apply(df, f):
    return df.apply(f, axis=1)

def apply_in_parallel(df, f, n=5):
    pool = Pool(n)
    df_chunks = np.array_split(df, n)
    apply_f = partial(df_apply, f=f)
    result_list = pool.map(apply_f, df_chunks)
    return pd.concat(result_list, axis=0)

def f(x):
  return x+1

if __name__ == '__main__':
  N = 10^8
  df = pd.DataFrame({"a": np.zeros(N), "b": np.zeros(N)})

  print "parallel"
  t0 = time.time()
  r = apply_in_parallel(df, f, n=5)
  print time.time() - t0

  print "single"
  t0 = time.time()
  r = df.apply(f, axis=1)
  print time.time() - t0

奇怪的行为:对于 N=10^7 它适用于 N=10^8 它给了我一个错误

Traceback (most recent call last):
  File "parallel_apply.py", line 27, in <module>
    r = apply_in_parallel(df, f, n=5)
  File "parallel_apply.py", line 14, in apply_in_parallel
    result_list = pool.map(apply_f, df_chunks)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 227, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 528, in get
    raise self._value
AttributeError: 'numpy.ndarray' object has no attribute 'apply'

有谁知道这里发生了什么?对于这种并行化方式的任何反馈,我也将不胜感激。我期望函数花费的时间比每行和数百万行的 inc 或 sum 更多。

谢谢!

4

2 回答 2

1

array_split接受任何类似数组的参数(包括pandas.DataFrame对象),但只返回保证它返回一个numpy.ndarray(DataFrames不是)。当然,ndarrays 没有apply方法,这正是您看到的错误。我真的很惊讶这在任何情况下都有效。您要么需要将数据帧拆分为子帧,要么应用对 ndarray 进行操作的函数。

于 2014-11-04T04:25:54.390 回答
1

N = 10^8results 2, N = 10^7results 13,因为运算符^是 XOR (不是幂)。所以一个 2 行长度的 df 不能被分成 5 个块。改用这个:N = 10**4N = 10**5. 使用这些值,您将看到时间上的差异。请注意大于N = 10**6(在此值下并行时间约为 30 秒,单次时间约为 167 秒)的值。并pool.close()在最后(之前return)使用 inapply_in_parallel()自动关闭池中的所有工作人员。

于 2015-05-05T09:08:07.913 回答