6

让我们定义:

from multiprocessing import Pool
import numpy as np
def func(x):
    for i in range(1000):
        i**2
    return 1

请注意,func()它做了一些事情,它总是返回一个小数字1

然后,我比较了一个 8 核并行Pool.map()v/sa 串行,python 内置,map()

n=10**3
a=np.random.random(n).tolist()

with Pool(8) as p:
    %timeit -r1 -n2  p.map(func,a)
%timeit -r1 -n2  list(map(func,a))

这给出了:

38.4 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)
200 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)

这显示了相当好的并行缩放。因为我用的是 8 个核心,38.3 [ms]大约是 1/8200[s]

然后让我们尝试Pool.map()一些更大的列表,为简单起见,我以这种方式使用列表列表:

n=10**3
m=10**4
a=np.random.random((n,m)).tolist()

with Pool(8) as p:
    %timeit -r1 -n2  p.map(func,a)
%timeit -r1 -n2  list(map(func,a))

这使 :

292 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)
209 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)

你看,并行缩放已经消失了!1s ~ 1.76s

我们可以让它变得更糟,尝试让每个子列表通过更大:

n=10**3
m=10**5
a=np.random.random((n,m)).tolist()

with Pool(8) as p:
    %timeit -r1 -n2  p.map(func,a)
%timeit -r1 -n2  list(map(func,a))

这给出了:

3.29 s ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)
179 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 2 loops each)

哇,更大的子列表,时间结果完全相反。我们使用 8 个核心来获得慢 20 倍的时序!!

您还可以注意到 serialmap()的时间与子列表大小无关。因此,一个合理的解释是,Pool.map()真正将那些大子列表的内容传递给导致额外副本的进程?

我不知道。但如果是这样,为什么它不传递子列表的地址?毕竟,子列表已经在内存中,并且在实践中func()我使用的保证不会更改/修改子列表。

那么,在 python 中,在将某些操作映射到大型事物列表时保持并行缩放的正确方法是什么?

4

2 回答 2

9

您的工作职能过早结束:

In [2]: %timeit func(1)
335 µs ± 12.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

所以你基本上是在测量多处理的开销。

改变你的工作函数来做更多的工作,比如循环1000 * 1000次数而不是1000次数,你会看到它再次缩放,1000000循环在我的 Mac 上的成本大致0.4s很高,这与开销相比已经足够高了。

下面是n我的mac上不同的测试结果,我使用Pool(4)的是4核,测试只运行一次而不是多次%timeit,因为差异是微不足道的:

加速图

您可以看到加速比与 成比例增加n,多处理的开销由每个工作函数调用共享。

背后的数学,假设每次调用的开销是相等的:

ratio = {time_{single} \over time_{mp}} = {cost_{work} * n \over {{cost_{work} * n \over p_{cores}} + cost_{overhead} * n}} = { 1 \over {\dfrac{1}{p_{cores}} + {cost_{overhead} \over cost_{work}} }}

如果我们想要ratio > 1

1 - {\dfrac{1}{p_{cores}} > {cost_{overhead} \over cost_{work}} }

大约等于:

在此处输入图像描述

这意味着,如果与每次调用开销相比,工作函数运行得太快multiprocessing,则无法扩展。

于 2020-02-24T17:28:07.050 回答
9

在我们开始
并深入研究任何纳秒的搜索之前(没错,它很快就会开始,因为每个[ns]问题都很重要,因为缩放打开了问题的整个潘多拉盒子),让我们就规模达成一致- 最简单且通常“便宜”为时过早一旦问题规模扩大到现实规模,技巧可能而且经常会破坏你的梦想 - 数千个(在两个迭代器中都可以看到)对于缓存计算< 0.5 [ns]数据获取的行为方式不同,与一旦超过 L1 /L2/L3-cache-sizes 用于高于 s 的比例1E+5, 1E+6, 1E+9,[GB]其中每个对齐的提取都比几个更昂贵100 [ns]

“...因为我有 8 个核心,我想用它们来获得 8 倍的速度

希望。然而,很抱歉直截了当地说实话,世界不是这样运作的。

查看这个交互式工具,它将向您展示加速限制及其对初始问题的实际扩展的实际生产成本的主要依赖性,因为它从微不足道的大小增长,并且这些组合效应在规模上 只需单击- 它和滑块以实时查看它的实际效果

动画 UI 交互递减法则,重新制定,以便还包含处理附加设置/终止开销

:(是)Pool.map()真的将那些大子列表的内容传递到导致额外副本的进程周围吗?

是的,
它必须这样做,按照设计
,它通过将所有数据“通过”另一个“昂贵的” SER/DES 处理来实现这一点,
以使其发生在“那里”交付。当你试图“返回”
一些乳齿象大小的结果时,反之亦然,但你没有,在上面。

但是如果是这样,为什么它不传递子列表的地址?

因为远程(参数接收)进程是另一个完全自治的进程,具有自己的、独立的和受保护的地址空间,我们不能只是将地址引用“传入”,我们希望它是一个完全独立的、自治的工作 python 进程(由于愿意使用这个技巧来逃避GIL 锁舞),不是吗?当然我们做到了——这是我们摆脱 GIL-Wars 的核心步骤(为了更好地理解 GIL-lock 的利弊,可能会喜欢这个这个(Pg.15+ on CPU-bound processing)。

             0.1 ns - NOP
             0.3 ns - XOR, ADD, SUB
             0.5 ns - CPU L1 dCACHE reference           (1st introduced in late 80-ies )
             0.9 ns - JMP SHORT
             1   ns - speed-of-light (a photon) travel a 1 ft (30.5cm) distance -- will stay, throughout any foreseeable future :o)
?~~~~~~~~~~~ 1   ns - MUL ( i**2 = MUL i, i )~~~~~~~~~ doing this 1,000 x is 1 [us]; 1,000,000 x is 1 [ms]; 1,000,000,000 x is 1 [s] ~~~~~~~~~~~~~~~~~~~~~~~~~
           3~4   ns - CPU L2  CACHE reference           (2020/Q1)
             5   ns - CPU L1 iCACHE Branch mispredict
             7   ns - CPU L2  CACHE reference
            10   ns - DIV
            19   ns - CPU L3  CACHE reference           (2020/Q1 considered slow on 28c Skylake)
            71   ns - CPU cross-QPI/NUMA best  case on XEON E5-46*
           100   ns - MUTEX lock/unlock
           100   ns - own DDR MEMORY reference
           135   ns - CPU cross-QPI/NUMA best  case on XEON E7-*
           202   ns - CPU cross-QPI/NUMA worst case on XEON E7-*
           325   ns - CPU cross-QPI/NUMA worst case on XEON E5-46*
        10,000   ns - Compress 1K bytes with a Zippy PROCESS
        20,000   ns - Send     2K bytes over 1 Gbps  NETWORK
       250,000   ns - Read   1 MB sequentially from  MEMORY
       500,000   ns - Round trip within a same DataCenter
?~~~ 2,500,000   ns - Read  10 MB sequentially from  MEMORY~~(about an empty python process to copy on spawn)~~~~ x ( 1 + nProcesses ) on spawned process instantiation(s), yet an empty python interpreter is indeed not a real-world, production-grade use-case, is it?
    10,000,000   ns - DISK seek
    10,000,000   ns - Read   1 MB sequentially from  NETWORK
?~~ 25,000,000   ns - Read 100 MB sequentially from  MEMORY~~(somewhat light python process to copy on spawn)~~~~ x ( 1 + nProcesses ) on spawned process instantiation(s)
    30,000,000   ns - Read 1 MB sequentially from a  DISK
?~~ 36,000,000   ns - Pickle.dump() SER a 10 MB object for IPC-transfer and remote DES in spawned process~~~~~~~~ x ( 2 ) for a single 10MB parameter-payload SER/DES + add an IPC-transport costs thereof or NETWORK-grade transport costs, if going into [distributed-computing] model Cluster ecosystem
   150,000,000   ns - Send a NETWORK packet CA -> Netherlands
  |   |   |   |
  |   |   | ns|
  |   | us|
  | ms|

“在大型事物列表上并行映射某些操作时,保持并行扩展的正确方法是什么?”

在此处输入图像描述

A )
了解避免或至少减少费用的方法:了解您必须支付并将支付

的所有类型费用

  • 花费尽可能低的流程实例化成本相当昂贵)最好只作为一次性成本

    在 macOS 上,spawnstart 方法现在是默认方法。start 方法应该被认为是不安全的fork,因为它可能导致子进程崩溃。请参阅bpo-33725

  • 花费尽可能少的参数传递成本(是的,最好避免重复传递那些“大东西”作为参数)

  • 永远不要将资源浪费在不执行您的工作的事情上 - (永远不会产生比报告更多的进程len( os.sched_getaffinity( 0 ) )- 任何超过此数量的进程都会等待其下一个 CPU 核心插槽,并且只会驱逐其他缓存高效的进程,因此,一旦已经支付了重新获取所有数据的所有获取成本,就需要重新支付所有的获取成本,以便将它们重新存储在缓存中,以便很快再次被驱逐出缓存计算,而到目前为止以这种方式工作的那些进程是由于天真地使用尽可能多的multiprocessing.cpu_count()-reported 进程而被驱逐(有什么好处? Pool
  • 更好地重用预先分配的内存,而不是继续花费临时内存分配成本 ALAP
  • 永远不要分享一点,如果表演是目标
  • 从不阻塞,从不 -gc如果不避免,python 可能会阻塞,或者Pool.map()阻塞

B
了解提高效率的方法:

了解所有提高效率的技巧,即使以代码复杂性为代价(一些 SLOC-s 很容易在教科书中显示,但牺牲了效率和性能 - 尽管在整个扩展过程中(无论是问题大小还是迭代深度,或者同时增长两者),这两者都是你争取可持续性能的主要敌人。

来自A的某些类别的实际成本)已经极大地改变了理论上可实现的加速限制,这些限制可以预期进入某种形式的[PARALLEL]流程编排(在这里,使代码执行的某些部分在生成的子中执行)流程),其最初的观点早在 60 多年前由 Gene Amdahl 博士首次提出(最近添加了两个与流程实例化相关的设置+终止增加成本的主要扩展(非常在 py2 always 和 py3.5+ for MacOS 和 Windows 中很重要)和 an atomicity-of-work,这将在下面讨论。

Amdahl 定律加速 S 的开销严格重新制定:

S   = speedup which can be achieved with N processors
s   = a proportion of a calculation, which is [SERIAL]
1-s = a parallelizable portion, that may run  [PAR]
N   = a number of processors ( CPU-cores ) actively participating on [PAR] processing

               1
S =  __________________________; where s, ( 1 - s ), N were defined above
                ( 1 - s )            pSO:= [PAR]-Setup-Overhead     add-on cost/latency
     s  + pSO + _________ + pTO      pTO:= [PAR]-Terminate-Overhead add-on cost/latency
                    N               

开销严格和资源感知的重新制定:

                           1                         where s, ( 1 - s ), N
S =  ______________________________________________ ;      pSO, pTO
                   | ( 1 - s )             |               were defined above
     s  + pSO + max|  _________ , atomicP  |  + pTO        atomicP:= a unit of work,
                   |     N                 |                         further indivisible,
                                                                     a duration of an
                                                                     atomic-process-block

使用您的 python 在目标 CPU/RAM 设备上制作原型,缩放 >>1E+6

任何简化的模型示例都会以某种方式扭曲您对实际工作负载如何在体内执行的期望。低估的 RAM 分配,在小规模上看不到,后来可能会在规模上大吃一惊,有时甚至会使操作系统进入缓慢状态,交换和颠簸。numba.jit()一些更智能的工具(

from multiprocessing import Pool
import numpy as np
import os

SCALE = int( 1E9 )
STEP  = int( 1E1 )
aLIST = np.random.random( ( 10**3, 10**4 ) ).tolist()

#######################################################################################
#   func() does some SCALE'd amount of work, yet
#                                                passes almost zero bytes as parameters
#                                                allocates nothing, but iterator
#                                                returns one byte,
#                                                invariant to any expensive inputs
def func( x ):  
    for i in range( SCALE ):
        i**2
    return 1

关于降低管理成本的扩展策略的一些提示:

#####################################################################################
#   more_work_en_block() wraps some SCALE'd amount of work, sub-list specified
def more_work_en_block( en_block = [ None, ] ):
    return [ func( nth_item ) for nth_item in en_block ]

如果确实必须传递一个大列表,最好传递更大的块,远程迭代其部分(而不是为每个传递的每个项目支付传输成本,而不是使用sub_blocks(参数得到 SER/DES 处理(~ pickle.dumps()+ ) [per-each-call] 的成本pickle.loads(),再次以附加成本计算,这会降低由此产生的效率并恶化扩展的、间接费用严格的阿姆达尔定律的间接费用部分)

#####################################################################################
#   some_work_en_block() wraps some SCALE'd amount of work, tuple-specified
def some_work_en_block( sub_block = ( [ None, ], 0, 1 ) ):
    return more_work_en_block( en_block = sub_block[0][sub_block[1]:sub_block[2]] )

调整流程实例的数量:

aMaxNumOfProcessesThatMakesSenseToSPAWN = len( os.sched_getaffinity( 0 ) ) # never more

with Pool( aMaxNumOfProcessesThatMakesSenseToSPAWN ) as p:
     p.imap_unordered( more_work_en_block, [ ( aLIST,
                                               start,
                                               start + STEP
                                               )
                                           for start in range( 0, len( aLIST ), STEP ) ] )

最后但并非最不重要的一点是,预计智能矢量化代码的智能使用会带来巨大的性能提升numpy,最好不要重复传递静态的、预复制的(在进程实例化期间),因此支付为合理缩放的成本,这里是不可避免的成本其中)BLOB,在代码中使用,无需通过参数传递传递相同的数据,以向量化(CPU 非常高效)的方式作为只读数据。一些关于如何进行~ +500 x加速的例子可以在这里这里阅读,关于但~ +400 x加速或关于加速的案例~ +100 x以及一些问题隔离测试场景的例子。

无论如何,模型代码越接近您的实际工作负载,基准测试就越有意义(在规模和生产中)。


祝你探索世界好运,因为它是
不同的,不是梦想,
不是希望它不同,或者我们希望它是

:o)

事实和科学很重要 - 两者+一起

证据记录是实现尽可能高绩效的核心步骤,
不是任何产品营销,
不是任何福音化部落战争,
不是任何博客帖子的喋喋不休

至少不要说你没有被警告

:o)


于 2020-02-27T06:59:54.200 回答