2
import multiprocessing as mp
import numpy as np

pool   = mp.Pool( processes = 4 )
inp    = np.linspace( 0.01, 1.99, 100 )
result = pool.map_async( func, inp ) #Line1 ( func is some Python function which acts on input )
output = result.get()                #Line2

因此,我试图在 Python 中使用实例.map_async()上的方法并行化一些代码。multiprocessing.Pool()

我注意到虽然
Line1大约需要千分之一秒,但
Line2大约需要 0.3 秒。

有没有更好的方法来做到这一点或绕过由 引起的瓶颈Line2
或者
我在这里做错了什么?

(我对此很陌生。)

4

1 回答 1

0

我在这里做错了吗?

不要惊慌,许多用户也是如此——付出的比收到的多。

这是一个普通的讲座,不是关于使用一些“有前途的”语法构造函数,而是关于支付使用它的实际成本。

故事很长,效果很简单——你期望一个容易实现的成果,但必须付出巨大的流程实例化、工作包重新分配和收集结果的成本,所有这些马戏团只是为了做一些事情几轮func()电话。


哇?
停止!
并行化给我带来了 SPEEDUP 处理?!?

那么,谁告诉你任何这种(潜在的)加速都是免费的?

让我们量化,而不是测量实际的代码执行时间,而不是情绪,对吧?

基准测试总是一个公平的举动。
它帮助我们,凡人,摆脱公正的期望
,让自己进入定量的证据记录支持的知识

from zmq import Stopwatch; aClk = Stopwatch() # this is a handy tool to do so

原样测试:

在继续之前,应该记录这对:

>>> aClk.start(); _ = [   func( SEQi ) for SEQi in inp ]; aClk.stop() # [SEQ] 
>>> HowMuchWillWePAY2RUN( func, 4, 100 )                              # [RUN]
>>> HowMuchWillWePAY2MAP( func, 4, 100 )                              # [MAP]

如果希望使用任何其他工具(如上述或其他工具)扩展实验, 这将设置性能包络之间的跨度,从纯[SERIAL][SEQ] 调用到未优化或任何其他。joblib.Parallel()multiprocessing.Pool()


测试用例 A:

意图:
以便衡量一个{过程的成本| 作业}-实例化,我们需要一个 NOP-work-package 有效负载,它几乎不会在“那里”花费任何东西,但会返回“回来”,并且不需要支付任何额外的附加成本(无论是任何输入参数的传输或返回任何值)

def a_NOP_FUN( aNeverConsumedPAR ):
    """                                                 __doc__
    The intent of this FUN() is indeed to do nothing at all,
                             so as to be able to benchmark
                             all the process-instantiation
                             add-on overhead costs.
    """
    pass

因此,设置开销附加成本比较在这里:

#-------------------------------------------------------<function a_NOP_FUN
[SEQ]-pure-[SERIAL] worked within ~   37 ..     44 [us] on this localhost
[MAP]-just-[CONCURENT] tool         2536 ..   7343 [us]
[RUN]-just-[CONCURENT] tool       111162 .. 112609 [us]

使用
joblib.delayed()关于joblib.Parallel()任务处理的策略:

def HowMuchWillWePAY2RUN( aFun2TEST = a_NOP_FUN, JOBS_TO_SPAWN = 4, RUNS_TO_RUN = 10 ):
    from zmq import Stopwatch; aClk = Stopwatch()
    try:
         aClk.start()
         joblib.Parallel(  n_jobs = JOBS_TO_SPAWN
                          )( joblib.delayed( aFun2TEST )
                                           ( aFunPARAM )
                                       for ( aFunPARAM )
                                       in  range( RUNS_TO_RUN )
                             )
    except:
         pass
    finally:
         try:
             _ = aClk.stop()
         except:
             _ = -1
             pass
    pass;  pMASK = "CLK:: {0:_>24d} [us] @{1: >4d}-JOBs ran{2: >6d} RUNS {3:}"
    print( pMASK.format( _,
                         JOBS_TO_SPAWN,
                         RUNS_TO_RUN,
                         " ".join( repr( aFun2TEST ).split( " ")[:2] )
                         )
            )

在实例上使用轻量级
.map_async()方法的策略:multiprocessing.Pool()

def HowMuchWillWePAY2MAP( aFun2TEST = a_NOP_FUN, PROCESSES_TO_SPAWN = 4, RUNS_TO_RUN = 1 ):
    from zmq import Stopwatch; aClk = Stopwatch()
    try:
         import numpy           as np
         import multiprocessing as mp

         pool = mp.Pool( processes = PROCESSES_TO_SPAWN )
         inp  = np.linspace( 0.01, 1.99, 100 )

         aClk.start()
         for i in xrange( RUNS_TO_RUN ):
             pass;    result = pool.map_async( aFun2TEST, inp )
             output = result.get()
         pass
    except:
         pass
    finally:
         try:
             _ = aClk.stop()
         except:
             _ = -1
             pass
    pass;  pMASK = "CLK:: {0:_>24d} [us] @{1: >4d}-PROCs ran{2: >6d} RUNS {3:}"
    print( pMASK.format( _,
                         PROCESSES_TO_SPAWN,
                         RUNS_TO_RUN,
                         " ".join( repr( aFun2TEST ).split( " ")[:2] )
                         )
            )

因此,
第一组痛苦和惊喜直接来自并发池中的
实际无所事事成本joblib.Parallel()

 CLK:: __________________117463 [us] @   4-JOBs ran    10 RUNS <function a_NOP_FUN
 CLK:: __________________111182 [us] @   3-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________110229 [us] @   3-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________110095 [us] @   3-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________111794 [us] @   3-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________110030 [us] @   3-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________110697 [us] @   3-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: _________________4605843 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________336208 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________298816 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________355492 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________320837 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________308365 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________372762 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________304228 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________337537 [us] @ 123-JOBs ran   100 RUNS <function a_NOP_FUN
 CLK:: __________________941775 [us] @ 123-JOBs ran 10000 RUNS <function a_NOP_FUN
 CLK:: __________________987440 [us] @ 123-JOBs ran 10000 RUNS <function a_NOP_FUN
 CLK:: _________________1080024 [us] @ 123-JOBs ran 10000 RUNS <function a_NOP_FUN
 CLK:: _________________1108432 [us] @ 123-JOBs ran 10000 RUNS <function a_NOP_FUN
 CLK:: _________________7525874 [us] @ 123-JOBs ran100000 RUNS <function a_NOP_FUN

因此,这个科学公平和严格的测试从这个最简单的案例开始,已经显示了所有相关代码执行处理设置开销的基准成本,这是有史以来最小的 joblib.Parallel() 惩罚 sine-qua-non

这将我们引向了一个方向,现实世界的算法确实存在 - 最好接下来在测试循环中添加一些越来越大的“有效负载”大小。



现在,我们知道了进入just[CONCURRENT] ”的惩罚——代码执行——接下来呢?

使用这种系统化和轻量级的方法,我们可能会继续前进,因为我们还需要对附加成本和其他阿姆达尔定律的间接影响进行基准测试{ remote-job-PAR-XFER(s) | remote-job-MEM.alloc(s) | remote-job-CPU-bound-processing | remote-job-fileIO(s) }

像这样的函数模板可能有助于重新测试(如您所见,将有很多需要重新运行,而 O/S 噪音和一些额外的工件将进入实际的使用成本模式):


测试用例 B:

一旦我们支付了前期成本,下一个最常见的错误就是忘记内存分配的成本。所以,让我们测试一下:

def a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR( aNeverConsumedPAR, SIZE1D = 1000 ):
    """                                                 __doc__
    The intent of this FUN() is to do nothing but
                             a MEM-allocation
                             so as to be able to benchmark
                             all the process-instantiation
                             add-on overhead costs.
    """
    import numpy as np              # yes, deferred import, libs do defer imports
    aMemALLOC = np.zeros( ( SIZE1D, #       so as to set
                            SIZE1D, #       realistic ceilings
                            SIZE1D, #       as how big the "Big Data"
                            SIZE1D  #       may indeed grow into
                            ),
                          dtype = np.float64,
                          order = 'F'
                          )         # .ALLOC + .SET
    aMemALLOC[2,3,4,5] = 8.7654321  # .SET
    aMemALLOC[3,3,4,5] = 1.2345678  # .SET

    return aMemALLOC[2:3,3,4,5]

如果您的平台将停止分配请求的内存块,那么我们会遇到另一种问题(如果尝试以与物理资源无关的方式并行,则会出现一类隐藏的玻璃天花板) . 可以编辑SIZE1D缩放,以便至少适合平台 RAM 寻址/大小调整功能,然而,现实世界问题计算的性能包络仍然是我们非常感兴趣的:

>>> HowMuchWillWePAY2RUN( a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR, 200, 1000 )

可能会产生
支付成本,介于0.1 [s]+9 [s](!!)之间
只是为了做 STILL NOTHING,但现在也不会忘记一些现实的 MEM 分配附加成本“那里

CLK:: __________________116310 [us] @   4-JOBs ran    10 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________120054 [us] @   4-JOBs ran    10 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________129441 [us] @  10-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________123721 [us] @  10-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________127126 [us] @  10-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________124028 [us] @  10-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________305234 [us] @ 100-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________243386 [us] @ 100-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________241410 [us] @ 100-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________267275 [us] @ 100-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________244207 [us] @ 100-JOBs ran   100 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________653879 [us] @ 100-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________405149 [us] @ 100-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________351182 [us] @ 100-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________362030 [us] @ 100-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: _________________9325428 [us] @ 200-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________680429 [us] @ 200-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________533559 [us] @ 200-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: _________________1125190 [us] @ 200-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR
CLK:: __________________591109 [us] @ 200-JOBs ran  1000 RUNS <function a_NOP_FUN_WITH_JUST_A_MEM_ALLOCATOR

测试用例 C:

请阅读这篇文章的尾部

测试用例 D:

请阅读这篇文章的尾部


结语:

对于每一个“承诺”,最好的下一步是首先交叉验证实际的代码执行成本,然后再开始任何代码重新设计。现实世界平台的附加成本总和可能会破坏任何预期的加速,即使最初的、开销天真的阿姆达尔定律可能已经产生了一些预期的加速效应。

正如 Walter E. Deming 先生多次表示的那样,没有数据,我们就只能听天由命了


一个额外的部分:
读到这里,人们可能已经发现,本身没有任何“缺点”或“错误” #Line2,但仔细的设计实践将展示任何更好的语法构造函数,花费少实现多(代码执行平台上的实际资源(CPU、MEM、IO、O/S)允许)。其他任何事情都与只是盲目地告诉财富没有本质上的不同。

于 2017-12-09T17:11:34.743 回答