3

我对 Python 多处理感到困惑。

我正在尝试加速处理来自数据库的字符串的函数,但我一定误解了多处理的工作原理,因为该函数在分配给工作人员池时比“正常处理”需要更长的时间。

这是我想要实现的一个例子。

from time import clock, time
from multiprocessing import Pool, freeze_support

from random import choice


def foo(x):
    TupWerteMany = []
    for i in range(0,len(x)):
         TupWerte = []
          s = list(x[i][3])
          NewValue = choice(s)+choice(s)+choice(s)+choice(s)
          TupWerte.append(NewValue)
          TupWerte = tuple(TupWerte)

          TupWerteMany.append(TupWerte)
     return TupWerteMany



 if __name__ == '__main__':
     start_time = time()
     List = [(u'1', u'aa', u'Jacob', u'Emily'),
        (u'2', u'bb', u'Ethan', u'Kayla')]
     List1 = List*1000000

     # METHOD 1 : NORMAL (takes 20 seconds) 
     x2 = foo(List1)
     print x2[1:3]

     # METHOD 2 : APPLY_ASYNC (takes 28 seconds)
     #    pool = Pool(4)
     #    Werte = pool.apply_async(foo, args=(List1,))
     #    x2 = Werte.get()
     #    print '--------'
     #    print x2[1:3]
     #    print '--------'

     # METHOD 3: MAP (!! DOES NOT WORK !!)

     #    pool = Pool(4)
     #    Werte = pool.map(foo, args=(List1,))
     #    x2 = Werte.get()
     #    print '--------'
     #    print x2[1:3]
     #    print '--------'


     print 'Time Elaspse: ', time() - start_time

我的问题:

  1. 为什么 apply_async 比“正常方式”花费更长的时间?
  2. 我在地图上做错了什么?
  3. 通过多处理来加速此类任务是否有意义?
  4. 最后:在我读完这里之后,我想知道 python 中的多处理是否可以在 Windows 上运行?
4

3 回答 3

2

所以你的第一个问题是没有发生实际的并行性foo(x),你将整个列表传递给函数一次。

1) 进程池的想法是让许多进程对某些数据的不同位进行计算。

 # METHOD 2 : APPLY_ASYNC
 jobs = 4
 size = len(List1)
 pool = Pool(4)
 results = []
 # split the list into 4 equally sized chunks and submit those to the pool
 heads = range(size/jobs, size, size/jobs) + [size]
 tails = range(0,size,size/jobs)
 for tail,head in zip(tails, heads):
      werte = pool.apply_async(foo, args=(List1[tail:head],))
      results.append(werte)

 pool.close()
 pool.join() # wait for the pool to be done

 for result in results:
      werte = result.get() # get the return value from the sub jobs

如果处理每个块所需的时间大于启动进程所需的时间,这只会给你一个实际的加速,在四个进程和四个作业要完成的情况下,当然,如果你'我们有 4 个流程和 100 个工作要做。请记住,您正在创建一个全新的 python 解释器四次,这不是免费的。

2) map 的问题是它适用于单独进程中的foo每个元素,这将需要相当长的时间。List1因此,如果您的池中有 4 个进程map将弹出列表中的一个项目四次并将其发送到要处理的进程 - 等待进程完成 - 弹出更多列表的内容 - 等待进程完成。仅当处理单个项目需要很长时间时才有意义,例如,如果每个项目都是指向 1 GB 文本文件的文件名。但就目前而言,map 只会获取列表的单个字符串并将其传递到fooasapply_async获取列表的一部分的地方。试试下面的代码

def foo(thing):
    print thing

map(foo, ['a','b','c','d'])

那是内置的 python 映射,将运行单个进程,但多进程版本的想法完全相同。

根据 JFSebastian 的评论添加:但是,您可以使用chunksize参数map来指定每个块的近似大小。

pool.map(foo, List1, chunksize=size/jobs) 

我不知道map在 Windows 上是否有问题,因为我没有可用于测试的。

3)是的,鉴于您的问题足够大,可以证明分叉出新的 python 解释器是合理的

4)不能给你一个明确的答案,因为它取决于核心/处理器的数量等,但一般来说它在 Windows 上应该没问题。

于 2012-08-24T20:24:24.560 回答
0

关于问题 (2) 在 Dougal 和 Matti 的指导下,我弄清楚了哪里出了问题。原始的 foo 函数处理列表列表,而 map 需要一个函数来处理单个元素。

新功能应该是

def foo2 (x):
    TupWerte = []
    s = list(x[3])
    NewValue = choice(s)+choice(s)+choice(s)+choice(s)
    TupWerte.append(NewValue)
    TupWerte = tuple(TupWerte)
    return TupWerte

以及调用它的块:

jobs = 4
size = len(List1)
pool = Pool()
#Werte = pool.map(foo2, List1, chunksize=size/jobs)
Werte = pool.map(foo2, List1)
pool.close()
print Werte[1:3]

感谢所有帮助我理解这一点的人。

所有方法的结果: 对于 List * 2 Mio 记录:正常 13.3 秒,与异步并行:7.5 秒,与带有 chunksize 的 map 并行:7.3,没有 chunksize 5.2 秒

于 2012-08-25T07:00:21.717 回答
-2

如果您有兴趣,这是一个通用的多处理模板。

import multiprocessing as mp
import time

def worker(x):
    time.sleep(0.2)
    print "x= %s, x squared = %s" % (x, x*x)
    return x*x

def apply_async():
    pool = mp.Pool()
    for i in range(100):
        pool.apply_async(worker, args = (i, ))
    pool.close()
    pool.join()

if __name__ == '__main__':
    apply_async()

输出如下所示:

x= 0, x squared = 0
x= 1, x squared = 1
x= 2, x squared = 4
x= 3, x squared = 9
x= 4, x squared = 16
x= 6, x squared = 36
x= 5, x squared = 25
x= 7, x squared = 49
x= 8, x squared = 64
x= 10, x squared = 100
x= 11, x squared = 121
x= 9, x squared = 81
x= 12, x squared = 144

如您所见,这些数字不是按顺序排列的,因为它们是异步执行的。

于 2012-08-24T20:30:45.103 回答