我最近一直在尝试通过multiprocessing
使用类本身内部的库来并行化我的一些代码(为了速度),并使用未绑定的方法(基本上,它是一个用户提供的函数,保存在类属性中)。它根本不起作用。
背景:我正在尝试并行化一个“并行遗传算法”类,顾名思义,它本身就是一个令人尴尬的并行问题。
据我所知,我的代码中有两个问题。(1) 用户提供的适应度函数没有导出到Pool
对象生成的过程中,而且似乎没有多少深度复制能够解决它。(2) 另一个问题是,也许该Pool
对象不确定如何处理多维输出……我真的不确定这个 tbh。
我试图开发出我的代码的一个小型独立版本,以便使事情更清晰(由于错误,它目前没有运行,这是重点):
from itertools import repeat
import multiprocessing as mp
import numpy as np
class GeneticAlgorithm:
def __init__(self, I, G, D, U, fitness_function, run_parallel):
self.fitness_function = fitness_function # User-supplied fitness function
self.D = D # Problem size (number of genes)
self.I = I # Population size (number of individuals)
self.G = G # Number of generations
self.U = U # Number of parallel populations
self.run_parallel = run_parallel
def sga(self):
'''One single-threaded genetic algorithm'''
# Activation rate is fixed at 0.5 for the sake of this MWE
pop = np.random.binomial(size=self.I * self.D, n=1, p=0.5).reshape(self.I, self.D) # Seed population
for g in range(self.G):
# fitness is computed for all individuals
fitpop = np.array([self.fitness_function(ind=ind) for ind in pop])
# fitness is scaled back to 100%
fitpop /= np.sum(fitpop)
# 2I parents are selected at random according to each individual's relative fitness score
parents = np.random.choice(range(self.I), size=2 * self.I, replace=True, p=fitpop).reshape(self.I, 2)
# Parents are crossed 2 by 2, with each pair producing exactly one offspring
crossover = np.array([np.random.choice(parents[i, :], size=self.D, replace=True) for i in range(self.I)]).reshape(self.I, self.D)
embryos = np.array([[pop[crossover[i, d], d] for d in range(self.D)] for i in range(self.I)]).reshape(self.I, self.D)
# Mutation rate is fixed at 1/D for the sake of this MWE
mutations = np.random.binomial(size=self.I * self.D, n=1, p=1 / self.D).reshape(self.I, self.D)
# "Mutated embryos" become fully fledged individuals and replace the parent generation
pop = (1 - mutations) * embryos + mutations * (1 - embryos)
# Individuals are aggregated gene-wise, with the average of active and inactive genes creating a ratio
return pop.mean(axis=0)
def pga(self):
'''Multiple parallel genetic algorithms'''
if self.run_parallel:
p = mp.Pool(mp.cpu_count())
universes = p.starmap(GeneticAlgorithm.sga, zip(repeat(self, self.U)))
p.close()
p.join()
else:
universes = np.zeros(self.U * self.D).reshape(self.U, self.D)
for u in range(self.U):
universes[u, :] = self.sga()
# Multiple GAs are aggregated in a sort of "mean of means"
return universes.mean(axis=0)
if __name__ == '__main__':
def my_fitness_function(ind):
'''Dummy fitness function, scores all individual equally...'''
return 1.0
# Dummy test to check if the code runs... it doesn't :(
ga = GeneticAlgorithm(I=10, G=3, D=5, U=10, fitness_function=my_fitness_function, run_parallel=True)
print(ga.pga())
任何类型的提示、代码或完整的解决方案都将受到欢迎。这在 R 中曾经相当容易,但是对于 Python,我显然已经束手无策了……谢谢!
ETA:修复了代码中的一些错别字并添加了run_parallel
参数以表明它在没有并行化的情况下运行得非常好。哦,是的,我也在 Windows 上运行,否则我可能会尝试使用那个Ray
库,据我所知,它可以创造奇迹,尤其是与multiprocessing
.