1

也许这真的很简单,但我在理解这一点时遇到了一些问题。

我面临的挑战是从母函数内部执行子并行函数。在等待子并行函数调用的结果时,该母函数应该只运行一次。

我写了一个小例子来说明我的困境。

import string
from joblib import Parallel, delayed
import multiprocessing

def jobToDoById(id):
    #do some other logic based on the ID given
    rand_str  = ''.join(random.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for i in range(10))
    return [id, rand_str]


def childFunctionParallel(jobs):
    num_cores = multiprocessing.cpu_count()
    num_cores = num_cores - 1

    if __name__ == '__main__':
        p = Parallel(n_jobs=num_cores)(delayed(jobToDoById)(i) for i in jobs)
        return p

def childFunctionSerial(jobs):
    result = []
    for job in jobs:
        job_result = jobToDoById(job)
        result.append(job_result)
    return result



def motherFunction(countries_cities, doInParallel):
    result = []
    print("Start mainLogic")
    for country in countries_cities:
        city_list = countries_cities[country]
        if(doInParallel):
            cities_result = childFunctionParallel(city_list)
        else:
            cities_result = childFunctionSerial(city_list)
        result.append(cities_result)
        # ..... do some more logic

    # ..... do some more logic before returning
    print("End mainLogic")
    return result



print("Start Program")

countries_cities = {
    "United States" : ["Alabama", "Hawaii", "Mississippi", "Pennsylvania"],
    "United Kingdom" : ["Cambridge", "Coventry", "Gloucester", "Nottingham"],
    "France" : ["Marseille", "Paris", "Saint-Denis", "Nanterre", "Aubervilliers"],
    "Denmark" : ["Aarhus", "Slagelse", "Nykøbing F", "Rønne", "Odense"],
    "Australia" : ["Sydney", "Townsville", "Bendigo", "Bathurst", "Busselton"],
}
result_mother = motherFunction(countries_cities, doInParallel=True) # should be executed only once
print(result_mother) 
print("End Program")

如果您在两者之间切换doInParallelTrue那么False您可以看到问题。运行时childFunctionSerial()motherFunction()运行一次。但是当我们使用childFunctionParallelthen运行时,motherFunction()会执行多次。两者都给出相同的结果,但我遇到的问题是motherFunction()应该只执行一次。

两个问题:

1.如何重构程序,让我们执行一次母函数,
并从它内部开始并行作业,而不运行同一个母函数的多个实例?
2.我怎样才能将第二个参数传递给jobToDoById()除了id

4

1 回答 1

1

广告 2:将附加参数放入元组并传递( id, .., )

这个很简单,也很常用,所以在很多例子中都能遇到。

def jobToDoById( aTupleOfPARAMs = ( -1, ) ): # jobToDoById(id):
    #                                        #    do some other logic based on the ID given
    if not type( aTupleOfPARAMs ) is tuple:  # FUSE PROTECTION
       return [-1, "call interface violated"]
    if aTupleOfPARAMs[0] == -1:              # FUSE PROTECTION
       return [-1, None]
    # .......................................# GO GET PROCESSED:
    rand_str  = ''.join( random.choice( string.ascii_lowercase
                                      + string.ascii_uppercase
                                      + string.digits
                                        )
                                  for i in range( 10 )
                         )
    return [id, rand_str]

第一个问题有点难,但更有趣,因为系统设计在公正和真正的系统调度策略之间的主要区别[SERIAL][CONCURRENT][PARALLEL]并不总是在流行媒体中得到尊重(有时甚至在学术界)。


广告1:您可能会感到惊讶,这在当前版本中永远不会发生

您明确提到的代码joblib.Parallel和模块,但文档说:multiprocessing

默认情况下Parallel,使用 Pythonmultiprocessing模块来分叉单独的 Python工作进程,以在不同的 CPU 上同时执行任务。对于通用 Python 程序,这是一个合理的默认值,但它会产生一些开销,因为输入和输出数据需要在队列中序列化以与工作进程通信。

有两个含义 - 您的处理将支付双重, [TIME]-domain 和[SPACE]-domain 开销成本,这可能很容易变得不可接受的巨大开销成本(如果已经注意到上面引用中的“数据”和“序列化”这两个词,更好)-有关详细信息,请参见重新制定的阿姆达尔定律,详见章节:批评等

1)整个 Python 解释器,包括它的数据和内部状态是完全分叉的(所以你会得到尽可能多的副本,每个副本只运行一个流程流,这是为了不损失 GIL 循环的性能碎片/只有-1-运行-所有其他-必须等待类型的 GIL 阻塞/步进如果在基于线程的池等中进行,则存在任何 1+ 处理流。)

2)除了如上所述必须发生的所有完整的 Python 解释器 + 状态重新实例化之外,还有ALL <data-IN> +<data-OUT>是:

----------------------------MAIN-starts-to-escape-from-pure-[SERIAL]-processing--
  0:                        MAIN forks self
                                 [1]
                                 [2]
                                 ...
                                 [n_jobs] - as many copies of self as requested
   -------------------------MAIN-can-continue-in-"just"-[CONCURRENT]-after:
  1st-Data-IN-SERialised-in-MAIN's-"__main__"  
+ 2nd-Data-IN-QUEueed    in MAIN
+ 3rd-Data-IN-DEQueued              [ith_job]s
+ 4th-Data-IN-DESerialised          [ith_job]s
+ ( ...process operated the usefull [ith_job]s -<The PAYLOAD>-planned... )  
+ 5th-Data-OUT-SERialised           [ith_job]s
+ 6th-Data-OUT-QUEued               [ith_job]s
+ 7th-Data-OUT-DEQueued     in-MAIN
+ 8th-Data-OUT-DESerialised-in-MAIN's-"__main__"  
-------------------------------MAIN-can-continue-in-pure-[SERIAL]-processing-----

所有这些总是花费不可忽略的开销时间(对于方程式和细节,请参考: 开销-严格重新制定这些附加开销成本可实现的净加速,最好在潜入重构之前,您的机器将付出远远超过试图忽略这些本金和可基准管理费用所获得的收益)

为了分别以微秒为单位对这些开销成本进行基准测试,可以使用工具(但并非所有 StackOverflow 成员都对对这些进行量化稳健的基准测试感到高兴),只需在 StackOverflow上查看有关

实施的第二个主要限制joblib.Parallel,即使不是头撞,也影响到阿姆达尔定律,是与资源实际可用性无关的乐观主义,而资源状态感知调度是每个现实世界系统上发生的事情。

人们可能期望任何高度的并行代码执行,但除非对端到端(从上到下)系统覆盖采取复杂措施,否则所有处理都会进入“公正”的[CONCURRENT]时间表(即,如果资源允许 )。这方面扩展了这篇文章的内容,只是天真地放入了上面的方案中,表明如果 CPU 核心(主要是任何其他资源类)不可用,并发 将永远不会达到加速水平,资源可用性不可知的原始阿姆达尔定律是有希望的

----------------------------MAIN-starts-escape-from-processing---in-pure-[SERIAL]
  0:                        MAIN forks self                     -in-pure-[SERIAL]
                                 [1]                            -in-pure-[SERIAL]
                                 [2]                            -in-pure-[SERIAL]
                                 ...                            -in-pure-[SERIAL]
                                 [n_jobs] as many copies of self-in-pure-[SERIAL]
                                          as requested          -in-pure-[SERIAL]
  --------------------------MAIN-can-continue-in-"just"-[CONCURRENT]after[SERIAL]
+ 1st-Data-IN-SERialised-in-MAIN's-"__main__"  , job(2), .., job(n_jobs):[SERIAL]
+ 2nd-Data-IN-QEUueed    in MAIN for all job(1), job(2), .., job(n_jobs):[SERIAL]
+ 3rd-Data-IN-DEQueued              [ith_job]s:       "just"-[CONCURRENT]||X||X||
+ 4th-Data-IN-DESerialised          [ith_job]s:       "just"-[CONCURRENT]|X||X|||
+ ( ...process operated the usefull [ith_job]s-<The PAYLOAD>-planned... )||X|||X|
+ 5th-Data-OUT-SERialised           [ith_job]s:       "just"-[CONCURRENT]||||X|||
+ 6th-Data-OUT-QUEued               [ith_job]s:       "just"-[CONCURRENT]|X|X|X||
+ 7th-Data-OUT-DEQueued     in-MAIN <--l job(1), job(2), .., job(n_jobs):[SERIAL]
+ 8th-Data-OUT-DESerialised-in-MAIN's-"__main__" job(2), .., job(n_jobs):[SERIAL]
-------------------------------MAIN-can-continue-processing------in-pure-[SERIAL]
...                                                             -in-pure-[SERIAL]
于 2018-03-14T12:52:49.113 回答