我正在尝试使用内部 cpu 绑定任务加速嵌套循环:
def main():
for i in range(10):
cpu_bound_task()
for j in range(10):
cpu_bound_task()
for k in range(100):
cpu_bound_task()
为简单起见,cpu-bound 任务定义为:
def cpu_bound_task():
_ = [0]*1000000
基于从其他帖子(this或this)的简单推导,在最后一个循环上使用 pool.map() 就可以了(前提是循环足够大;否则启动池的开销似乎违背了创建的目的一个水池):
from multiprocessing import Pool
def main_pool():
for i in range(10):
cpu_bound_task()
for j in range(10):
cpu_bound_task()
pool = Pool()
pool.map(func=cpu_bound_task, iterable=()*100)
pool.close()
pool.join()
但是,我有两个额外的要求:
- 第一个循环也必须在 3 个并行进程中运行(我知道这不会提高速度)
- 循环的每次迭代必须等待该迭代中的所有过程完成,然后才能进行下一次迭代
我的方法(这可能不是最好的!)涉及创建一个 WaitGroup 类,其实例将与共享队列建立连接(将任务添加到该队列,发出任务完成信号,并等待组的进程完成)。然后,多个进程将运行一个 run_func() 函数,该函数将从该队列中获取任务并执行它们。
run_func() 函数定义为:
def run_func(q):
while True:
task = q.get()
func, kwargs = task
if func is None: break # signals end of all tasks
func(**kwargs, q=q)
WaitGroup 类定义为:
class WaitGroup():
def __init__(self, group_id, max_p, wg_shared_inputs):
self.group_id = group_id
self.max_p = max_p # maximum elements sent to the queue
self.wait_count = wg_shared_inputs['values'][self.group_id]
self.cv = wg_shared_inputs['conditions'][self.group_id]
def add(self, func, kwargs, q):
'''add task to the queue'''
self.cv.acquire()
if self.max_p:
while self.wait_count.value >= self.max_p: # >= or >, check
self.cv.wait() # releases lock automatically
q.put((func,{**kwargs,'parent_wg':self}))
self.wait_count.value += 1
self.cv.release()
def done(self):
'''mark task as completed'''
self.cv.acquire()
if self.wait_count.value > 0:
self.wait_count.value -= 1
if self.wait_count.value == 0:
self.cv.notify_all()
self.cv.release()
def wait(self):
'''wait for a group of tasks to be completed'''
self.cv.acquire()
while self.wait_count.value > 0:
self.cv.wait()
self.cv.release()
wg_shared_inputs 是一个预先创建的简单字典,其中包含 manager.Value() 和 manager.Condition() 的空实例。(理想情况下,这些实例将在需要时由 WaitGroup 类创建,但不幸的是我似乎无法做到这一点,因为 WorkGroup 实例作为参数传递给 Processes。所以我必须预先确定需要多少个实例)
最后一步是将循环拆分为多个步骤,主函数定义为:
def main_q():
# Handle Manager Variables
from multiprocessing import Manager
manager = Manager()
q = manager.Queue()
values = manager.dict({0:manager.Value('i',0), 1:manager.Value('i',0), 2:manager.Value('i',0)})
conditions = manager.dict({0:manager.Condition(), 1:manager.Condition(), 2:manager.Condition()})
wg_shared_inputs = {'values':values, 'conditions':conditions}
# Launch Processes
from multiprocessing import Process
num = 10
processes = [Process(target=run_func, args=(q,)) for _ in range(num)]
for p in processes: p.start()
# Create & Launch Wait Group
wg = WaitGroup(group_id=0, max_p=3, wg_shared_inputs=wg_shared_inputs)
for i in range(20): wg.add(step1, {'i':i, 'wg_shared_inputs':wg_shared_inputs}, q)
wg.wait()
# End Queue
for _ in range(num): q.put((None,{})) # signals end of all tasks
# Join Processes
for p in processes: p.join()
并将后续步骤定义为:
def step1(i, wg_shared_inputs, q, parent_wg=None):
cpu_bound_task()
wg = WaitGroup(group_id=1, max_p=1, wg_shared_inputs=wg_shared_inputs)
for j in range(10): wg.add(step2, {'i':i, 'wg_shared_inputs':wg_shared_inputs}, q)
wg.wait()
parent_wg.done()
def step2(i, wg_shared_inputs, q, parent_wg=None):
cpu_bound_task()
wg = WaitGroup(group_id=2, max_p=None, wg_shared_inputs=wg_shared_inputs)
for k in range(100): wg.add(step3, {'i':i, 'wg_shared_inputs':wg_shared_inputs}, q)
wg.wait()
parent_wg.done()
def step3(i, wg_shared_inputs, q, parent_wg=None):
cpu_bound_task()
parent_wg.done()
运行我得到的 3 个不同的选项:
SIMPLE VERSION (main())
Completed in 84.85 seconds
POOL VERSION (main_pool())
Completed in 62.62 seconds
QUEUE VERSION (main_q())
Completed in 131.84 seconds
我对结果感到惊讶。任何想法为什么队列版本要慢得多?或者有什么想法可以用不同的方法来实现我的目标?