2

我正在尝试 Python 多处理模块。在下面的代码中串行执行时间为 0.09 秒,并行执行时间为 0.2 秒。由于我没有得到加速,我想我可能会在某个地方出错

import multiprocessing as mp
from random import uniform, randrange
import time

# m = mp.Manager()
out_queue = mp.Queue()

def flop_no(rand_nos, a, b):
    cals = []
    for r in rand_nos:
        cals.append(r + a * b)
    return cals


def flop(val, a, b, out_queue):
    cals = []
    for v in val:
        cals.append(v + a * b)
    # print cals
    out_queue.put(cals)
    # print "Exec over"


def concurrency():
    # out_queue1 = mp.Queue()
    # out_queue2 = mp.Queue()
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    print len(rand_nos)
    # for i in range(5):
    start_time = time.time()
    p1 = mp.Process(target=flop, args=(rand_nos[:250000], a, b, out_queue))
    p2 = mp.Process(target=flop, args=(rand_nos[250000:500000], a, b, out_queue))
    p3 = mp.Process(target=flop, args=(rand_nos[500000:750000], a, b, out_queue))
    p4 = mp.Process(target=flop, args=(rand_nos[750000:], a, b, out_queue))
    p1.start()
    out_queue.get()
    # print "\nFinal:", len(out_queue.get())
    p2.start()
    out_queue.get()
    # print "\nFinal:", len(out_queue.get())
    p3.start()
    out_queue.get()

    p4.start()
    out_queue.get()

    p1.join()
    p2.join()
    p3.join()
    p4.join()

    print "Running time parallel: ", time.time() - start_time, "secs"

def no_concurrency():
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    start_time = time.time()
    cals = flop_no(rand_nos, a, b)
    print "Running time serial: ", time.time() - start_time, "secs"

if __name__ == '__main__':
    concurrency()
    no_concurrency()
    # print "Program over"

我的系统有四个核心。请让我知道我可以加速此代码的方法。另外,我有哪些使用 python 进行并行编程的选项(除了多处理模块)。

谢谢并恭祝安康

4

2 回答 2

1

爱是一种激情。. . 但如果一个人的信念只是盲目或天真地证明了证据,可能会造成很大的伤害

我喜欢 python 的易用性和通用性,但要实现 HPC 性能需要更多,还需要投入与硬件相关的洞察力和优化调整工作。

@RupjitChakraborty 正如您在下面我的回答中可能喜欢的那样,可以在纯代码中收到相同的结果,[SERIAL]最好的情况快约 50倍,比马克报告的时间快约 100 倍。随意在您的硬件上重新测试它,以便拥有一个相同的平台,以便对性能读数进行更严格的比较。尽管如此,享受对性能的追求吧!– user3666197 2017 年 12 月 1 日 13:39

如果我可以在这种永无止境的性能追求中投入几分钱:
- 尝试很好地理解原始的阿姆达尔定律 + 新的开销 - 严格的重新制定
- 尝试很好地量化出现的附加开销的成本流程管理
- 尝试很好地量化与大数据传输相关的附加开销的成本(一站式成本)
- 尽量避免任何和所有潜在的(b)锁定,有些可能隐藏在使用的构造函数的“后面”
-尽量避免同步+通信
的任何与处理无关的开销成本 - 尽量防止任何 CPU_core缓存未命中并且最好地最小化连贯性损失(是的,说起来容易,难以编码 - 即手动编写的代码通常比简单的单行代码更好,使用一些高度抽象的语法构造函数(但代价是无法管理),因为您可以在您的控制下采取更好的缓存相关决策步骤,而不是依靠某些上下文不知道的预制通用(即与​​您的特定优先级无关)代码转换来执行此操作)


想要加速?
始终孤立地系统地测试各个因素:

作为对您的代码将支付的实际成本的简要介绍(in [us]永远不要猜测,测试它。

测试用例 A:测量进程管理[SERIAL]- 进程调度附加成本
测试用例 B:测量远程进程内存分配附加成本
测试用例 C:测量远程进程[CONCURRENT]- 进程调度计算成本
测试用例 D:衡量远程流程工作负载对[CONCURRENT]调度成本的影响

有关详细信息,可以在[重要的架构、资源和流程调度事实] 一章中
进一步阅读并重用/改进幼稚的代码模板。

正如 Mark 已经警告过的那样,开销严格的 Amdahl 定律加速计算的另一个成本将来自从主进程到每个衍生的子进程的数据传输,其中纯[SERIAL]附加开销将并且确实增长超过线性比例到数据量,由于访问模式冲突、资源物理容量争用、共享对象信号化(b)锁定开销以及类似的硬件不可避免的障碍。

在深入研究性能调整选项之前,可以提出一个简单的测试用例 E:用于测量这类内存数据传输附加成本:

def a_FAT_DATA_XFER_COSTS_FUN( anIndeedFatPieceOfDATA ):
    """                                                 __doc__
    The intent of this FUN() is indeed to do nothing at all,
                             but to be able to benchmark
                             add-on overhead costs
                             raised by a need to transfer
                             some large amount of data
                             from a main()-process
                             to this FUN()-subprocess spawned.
    """
    return ( anIndeedFatPieceOfDATA[ 0]
           + anIndeedFatPieceOfDATA[-1]
             )

##############################################################
###  A NAIVE TEST BENCH
##############################################################
from zmq import Stopwatch; aClk = Stopwatch()
JOBS_TO_SPAWN =  4         # TUNE:  1,  2,  4,   5,  10, ..
RUNS_TO_RUN   = 10         # TUNE: 10, 20, 50, 100, 200, 500, 1000, ..
SIZE_TO_XFER  = 1E+6       # TUNE: +6, +7, +8,  +9, +10, ..

DATA_TO_XFER  = [ 1 for _ in range( int( SIZE_TO_XFER ) ) ]

try:
     aClk.start()
     #-----------------------------------------------------<_CODE_UNDER_TEST_>
     joblib.Parallel(  n_jobs = JOBS_TO_SPAWN
                      )( joblib.delayed( a_FAT_DATA_XFER_COSTS_FUN )
                                       ( a_FAT_DATA )
                                   for ( a_FAT_DATA )
                                   in  [       DATA_TO_XFER
                                         for _ in range( RUNS_TO_RUN )
                                         ]
                         )
     #-----------------------------------------------------<_CODE_UNDER_TEST_>
except:
     pass
finally:
     try:
         _ = aClk.stop()
     except:
         _ = -1
         pass

template = "CLK:: {0:_>24d} [us] @{1: >3d} run{2: >5d} RUNS ( {3: >12.3f}[MB]"

print( template.format( _,
                        JOBS_TO_SPAWN,
                        RUNS_TO_RUN,
                        SIZE_TO_SEND / 1024. /1024.
                        )
       )

请让我知道我可以加速此代码的方法。

  • 了解一下numba,绝对值得了解这个提高性能的工具
  • 了解操作的 矢量化
  • 在掌握了这两个之后,可能会考虑将已经完美的代码重新编写到 Cython 中

rVEC = np.random.uniform( 1, 4, 1E+6 )

def flop_NaivePY( r, a, b ):
    return(       r+(a *b ) )

aClk.start(); _ = flop_NaivePY( rVEC, a, b ); aClk.stop()
4868L
4253L
4113L
4376L
4333L
4137L
4.~_____[ms] @ 1.000.000 FLOAT-OPS, COOL, RIGHT?

然而,如果考虑性能,这段代码是非常错误的。

让我们打开numpy就地分配,避免重复的内存分配和类似的处理效率低下:

def flop_InplaceNUMPY( r, a, b ):
       r += a * b
       return r

aClk.start(); _ = flop_InplaceNUMPY( rVEC, a, b ); aClk.stop()
2459L
2426L
2658L
2444L
2421L
2430L
2429L
4.??         @ 1.000.000 FLOAT-OPS, COOL, RIGHT? NOT AS SEEN NOW
2.~!____[ms] @ 1.000.000 FLOAT-OPS, HALF, BETTER!
                                          BUT
                                          ALSO TEST THE SCALING
                                          ONCE GONE OFF CACHE,
                                          THAT TEST GET SMELL OF A NEED
                                                              TO OPTIMISE
                                                              CODE DESIGN

谨慎的实验者很快就会发现,在幼稚代码运行期间甚至可能会看到 python-process 被杀死,因为内存分配不足的请求将窒息并在大于 ~1E+9 的较大尺寸上终止)

这一切都将带来[SERIAL]代码但无需支付任何零附加成本,Gene Amdahl 叔叔将奖励您在最大代码设计期间花费的流程调度和硬件架构知识和努力。

没有更好的建议 。. . 除了从事纯粹的千里眼业务,永远无法进行重新测试

于 2017-12-01T13:23:13.160 回答
1

out_queue.get() 阻塞,直到默认情况下有可用的结果。因此,您实际上是在启动一个流程并等待它完成,然后再开始下一个流程。相反,启动所有进程,然后获得所有结果。

例子:

    #!python2
    import multiprocessing as mp
    from random import uniform, randrange
    import time

    def flop_no(rand_nos, a, b):
        cals = []
        for r in rand_nos:
            cals.append(r + a * b)
        return cals

    def flop(val, a, b, out_queue):
        cals = []
        for v in val:
            cals.append(v + a * b)
        out_queue.put(cals)
        # time.sleep(3)

    def concurrency():
        out_queue = mp.Queue()
        a = 3.3
        b = 4.4
        rand_nos = [uniform(1, 4) for i in range(1000000)]
        print len(rand_nos)
        # for i in range(5):
        start_time = time.time()
        p1 = mp.Process(target=flop, args=(rand_nos[:250000], a, b, out_queue))
        p2 = mp.Process(target=flop, args=(rand_nos[250000:500000], a, b, out_queue))
        p3 = mp.Process(target=flop, args=(rand_nos[500000:750000], a, b, out_queue))
        p4 = mp.Process(target=flop, args=(rand_nos[750000:], a, b, out_queue))

        p1.start()
        p2.start()
        p3.start()
        p4.start()

        print len(out_queue.get())
        print len(out_queue.get())
        print len(out_queue.get())
        print len(out_queue.get())

        p1.join()
        p2.join()
        p3.join()
        p4.join()

        print "Running time parallel: ", time.time() - start_time, "secs"

    def no_concurrency():
        a = 3.3
        b = 4.4
        rand_nos = [uniform(1, 4) for i in range(1000000)]
        start_time = time.time()
        cals = flop_no(rand_nos, a, b)
        print "Running time serial: ", time.time() - start_time, "secs"

    if __name__ == '__main__':
        concurrency()
        no_concurrency()
        # print "Program over" 

Output:

    1000000
    250000
    250000
    250000
    250000
    Running time parallel:  3.54999995232  secs
    Running time serial:    0.203000068665 secs

Note that parallel time is still slower.  This is due to the overhead of starting 4 other Python processes.  Your processing time for the whole job is only .2 seconds.  The 3.5 seconds for parallel is mostly just starting up the processes.  Note the commented out `# time.sleep(3)` above in `flop()`.  Add that code in and the times are:

    1000000
    250000
    250000
    250000
    250000
    Running time parallel:  6.50900006294  secs
    Running time serial:    0.203000068665 secs

The overall time only got 3 seconds faster (not 12) because they were running in parallel.  You need a lot more data to make parallel processing worthwhile.

Here's a version where you can visually see how long it takes to start the processes.  "here" is printed as each process begins to run `flop()`.  An event is used to start all threads at the same time, and only the processing time is counted:

#!python2

import multiprocessing as mp
from random import uniform, randrange
import time

def flop_no(rand_nos, a, b):
    cals = []
    for r in rand_nos:
        cals.append(r + a * b)
    return cals

def flop(val, a, b, out_queue, start):
    print 'here'
    start.wait()
    cals = []
    for v in val:
        cals.append(v + a * b)
    out_queue.put(cals)
    time.sleep(3)

def concurrency():
    out_queue = mp.Queue()
    start = mp.Event()
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    print len(rand_nos)
    # for i in range(5):
    p1 = mp.Process(target=flop, args=(rand_nos[:250000], a, b, out_queue, start))
    p2 = mp.Process(target=flop, args=(rand_nos[250000:500000], a, b, out_queue, start))
    p3 = mp.Process(target=flop, args=(rand_nos[500000:750000], a, b, out_queue, start))
    p4 = mp.Process(target=flop, args=(rand_nos[750000:], a, b, out_queue, start))

    p1.start()
    p2.start()
    p3.start()
    p4.start()
    time.sleep(5) # Wait for processes to start.  See Barrier in Python 3.2+ for a better solution.
    print "go"
    start.set()
    start_time = time.time()
    print len(out_queue.get())
    print len(out_queue.get())
    print len(out_queue.get())
    print len(out_queue.get())
    print "Running time parallel: ", time.time() - start_time, "secs"

    p1.join()
    p2.join()
    p3.join()
    p4.join()

def no_concurrency():
    a = 3.3
    b = 4.4
    rand_nos = [uniform(1, 4) for i in range(1000000)]
    start_time = time.time()
    cals = flop_no(rand_nos, a, b)
    print "Running time serial: ", time.time() - start_time, "secs"

if __name__ == '__main__':
    concurrency()
    no_concurrency()
    # print "Program over"

输出:

1000000
here           # note these print about a second apart.
here
here
here
go
250000
250000
250000
250000
Running time parallel:  0.171999931335 secs
Running time serial:    0.203000068665 secs

现在,处理时间变快了。不是很多......可能是由于进程间通信以获得结果。

于 2017-12-01T06:58:29.863 回答