也许这真的很简单,但我在理解这一点时遇到了一些问题。
我面临的挑战是从母函数内部执行子并行函数。在等待子并行函数调用的结果时,该母函数应该只运行一次。
我写了一个小例子来说明我的困境。
import string
from joblib import Parallel, delayed
import multiprocessing
def jobToDoById(id):
#do some other logic based on the ID given
rand_str = ''.join(random.choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for i in range(10))
return [id, rand_str]
def childFunctionParallel(jobs):
num_cores = multiprocessing.cpu_count()
num_cores = num_cores - 1
if __name__ == '__main__':
p = Parallel(n_jobs=num_cores)(delayed(jobToDoById)(i) for i in jobs)
return p
def childFunctionSerial(jobs):
result = []
for job in jobs:
job_result = jobToDoById(job)
result.append(job_result)
return result
def motherFunction(countries_cities, doInParallel):
result = []
print("Start mainLogic")
for country in countries_cities:
city_list = countries_cities[country]
if(doInParallel):
cities_result = childFunctionParallel(city_list)
else:
cities_result = childFunctionSerial(city_list)
result.append(cities_result)
# ..... do some more logic
# ..... do some more logic before returning
print("End mainLogic")
return result
print("Start Program")
countries_cities = {
"United States" : ["Alabama", "Hawaii", "Mississippi", "Pennsylvania"],
"United Kingdom" : ["Cambridge", "Coventry", "Gloucester", "Nottingham"],
"France" : ["Marseille", "Paris", "Saint-Denis", "Nanterre", "Aubervilliers"],
"Denmark" : ["Aarhus", "Slagelse", "Nykøbing F", "Rønne", "Odense"],
"Australia" : ["Sydney", "Townsville", "Bendigo", "Bathurst", "Busselton"],
}
result_mother = motherFunction(countries_cities, doInParallel=True) # should be executed only once
print(result_mother)
print("End Program")
如果您在两者之间切换doInParallel
,True
那么False
您可以看到问题。运行时childFunctionSerial()
只motherFunction()
运行一次。但是当我们使用childFunctionParallel
then运行时,motherFunction()
会执行多次。两者都给出相同的结果,但我遇到的问题是motherFunction()
应该只执行一次。
两个问题:
1.如何重构程序,让我们执行一次母函数,
并从它内部开始并行作业,而不运行同一个母函数的多个实例?
2.我怎样才能将第二个参数传递给jobToDoById()
除了id
?