我是期货模块的新手,并且有一项可以从并行化中受益的任务;但我似乎无法准确地弄清楚如何为线程设置函数和为进程设置函数。我将不胜感激任何人都可以就此事提供帮助。
我正在运行粒子群优化 (PSO)。无需过多介绍 PSO 本身,以下是我的代码的基本布局:
有一个Particle
类,有一个getFitness(self)
方法(计算一些度量并将其存储在 中self.fitness
)。PSO 模拟有多个粒子实例(很容易超过 10 个;对于某些模拟,有 100 个甚至 1000 个)。
每隔一段时间,我就必须计算粒子的适应度。目前,我在 for 循环中执行此操作:
for p in listOfParticles:
p.getFitness(args)
但是,我注意到每个粒子的适应度可以相互独立地计算。这使得这种适应度计算成为并行化的主要候选者。确实,我可以做到map(lambda p: p.getFitness(args), listOfParticles)
。
现在,我可以很容易地做到这一点futures.ProcessPoolExecutor
:
with futures.ProcessPoolExecutor() as e:
e.map(lambda p: p.getFitness(args), listOfParticles)
由于调用的副作用p.getFitness
存储在每个粒子本身中,我不必担心从futures.ProcessPoolExecutor()
.
到目前为止,一切都很好。但是现在我注意到它ProcessPoolExecutor
会创建新进程,这意味着它会复制内存,这很慢。我希望能够共享内存 - 所以我应该使用线程。这很好,直到我意识到在每个进程中运行多个线程并运行多个进程可能会更快,因为多个线程仍然只在我可爱的 8 核机器的一个处理器上运行。
这是我遇到麻烦的地方:
根据我看到的示例,ThreadPoolExecutor
在list
. 也是如此ProcessPoolExecutor
。所以我不能做任何迭代的事情ProcessPoolExecutor
,ThreadPoolExecutor
因为那样ThreadPoolExecutor
会得到一个单一的对象来处理(见我的尝试,贴在下面)。
另一方面,我不能对listOfParticles
自己进行切片,因为我想ThreadPoolExecutor
发挥自己的魔力来弄清楚需要多少线程。
所以,一个大问题(终于):
我应该如何构建我的代码,以便我可以使用进程和线程有效地并行化以下内容:
for p in listOfParticles:
p.getFitness()
这是我一直在尝试的,但我不敢尝试运行它,因为我知道它不会工作:
>>> def threadize(func, L, mw):
... with futures.ThreadpoolExecutor(max_workers=mw) as executor:
... for i in L:
... executor.submit(func, i)
...
>>> def processize(func, L, mw):
... with futures.ProcessPoolExecutor() as executor:
... executor.map(lambda i: threadize(func, i, mw), L)
...
我会很感激任何关于如何解决这个问题的想法,甚至是关于如何改进我的方法
万一这很重要,我在 python3.3.2