0

我最近一直在尝试通过multiprocessing使用类本身内部的库来并行化我的一些代码(为了速度),并使用未绑定的方法(基本上,它是一个用户提供的函数,保存在类属性中)。它根本不起作用。

背景:我正在尝试并行化一个“并行遗传算法”类,顾名思义,它本身就是一个令人尴尬的并行问题。

据我所知,我的代码中有两个问题。(1) 用户提供的适应度函数没有导出到Pool对象生成的过程中,而且似乎没有多少深度复制能够解决它。(2) 另一个问题是,也许该Pool对象不确定如何处理多维输出……我真的不确定这个 tbh。

我试图开发出我的代码的一个小型独立版本,以便使事情更清晰(由于错误,它目前没有运行,这是重点):

from itertools import repeat
import multiprocessing as mp
import numpy as np

class GeneticAlgorithm:
    def __init__(self, I, G, D, U, fitness_function, run_parallel):
        self.fitness_function = fitness_function # User-supplied fitness function
        self.D = D # Problem size (number of genes)
        self.I = I # Population size (number of individuals)
        self.G = G # Number of generations
        self.U = U # Number of parallel populations
        self.run_parallel = run_parallel

    def sga(self):
        '''One single-threaded genetic algorithm'''
        # Activation rate is fixed at 0.5 for the sake of this MWE
        pop = np.random.binomial(size=self.I * self.D, n=1, p=0.5).reshape(self.I, self.D) # Seed population

        for g in range(self.G):
            # fitness is computed for all individuals
            fitpop = np.array([self.fitness_function(ind=ind) for ind in pop])
            # fitness is scaled back to 100%
            fitpop /= np.sum(fitpop)
            # 2I parents are selected at random according to each individual's relative fitness score
            parents = np.random.choice(range(self.I), size=2 * self.I, replace=True, p=fitpop).reshape(self.I, 2)
            # Parents are crossed 2 by 2, with each pair producing exactly one offspring
            crossover = np.array([np.random.choice(parents[i, :], size=self.D, replace=True) for i in range(self.I)]).reshape(self.I, self.D)
            embryos = np.array([[pop[crossover[i, d], d] for d in range(self.D)] for i in range(self.I)]).reshape(self.I, self.D)
            # Mutation rate is fixed at 1/D for the sake of this MWE
            mutations = np.random.binomial(size=self.I * self.D, n=1, p=1 / self.D).reshape(self.I, self.D)
            # "Mutated embryos" become fully fledged individuals and replace the parent generation
            pop = (1 - mutations) * embryos + mutations * (1 - embryos)

        # Individuals are aggregated gene-wise, with the average of active and inactive genes creating a ratio
        return pop.mean(axis=0)

    def pga(self):
        '''Multiple parallel genetic algorithms'''
        if self.run_parallel:
            p = mp.Pool(mp.cpu_count())
            universes = p.starmap(GeneticAlgorithm.sga, zip(repeat(self, self.U)))
            p.close()
            p.join()
        else:
            universes = np.zeros(self.U * self.D).reshape(self.U, self.D)
            
            for u in range(self.U):
                universes[u, :] = self.sga()

        # Multiple GAs are aggregated in a sort of "mean of means"
        return universes.mean(axis=0)

if __name__ == '__main__':
    def my_fitness_function(ind):
        '''Dummy fitness function, scores all individual equally...'''
        return 1.0

    # Dummy test to check if the code runs... it doesn't :(
    ga = GeneticAlgorithm(I=10, G=3, D=5, U=10, fitness_function=my_fitness_function, run_parallel=True)
    print(ga.pga())

任何类型的提示、代码或完整的解决方案都将受到欢迎。这在 R 中曾经相当容易,但是对于 Python,我显然已经束手无策了……谢谢!

ETA:修复了代码中的一些错别字并添加了run_parallel参数以表明它在没有并行化的情况下运行得非常好。哦,是的,我也在 Windows 上运行,否则我可能会尝试使用那个Ray库,据我所知,它可以创造奇迹,尤其是与multiprocessing.

4

1 回答 1

0

看我对你帖子的评论。此外,池中的每个进程都应该唯一地为随机数生成器提供种子。否则,它们将生成相同的随机数序列。

请注意,由于创建进程池以及将参数和结果从一个进程的地址空间传递到另一个地址空间的开销,多处理不一定会运行得更快。您的工作函数sga必须具有足够的 CPU 密集型才能使多处理具有优势,我现在明白它实际上是.

from itertools import repeat
import multiprocessing as mp
import numpy as np

# this will be executed by each process in the pool:
def init_pool():
    from threading import current_thread
    ident = current_thread().ident
    np.random.seed(ident)

def my_fitness_function(ind):
    '''Dummy fitness function, scores all individual equally...'''
    return 1.0

class GeneticAlgorithm:
    def __init__(self, I, G, D, U, fitness_function, run_parallel):
        self.fitness_function = fitness_function # User-supplied fitness function
        self.D = D # Problem size (number of genes)
        self.I = I # Population size (number of individuals)
        self.G = G # Number of generations
        self.U = U # Number of parallel populations
        self.run_parallel = run_parallel

    def sga(self):
        '''One single-threaded genetic algorithm'''
        # Activation rate is fixed at 0.5 for the sake of this MWE
        pop = np.random.binomial(size=self.I * self.D, n=1, p=0.5).reshape(self.I, self.D) # Seed population

        for g in range(self.G):
            # fitness is computed for all individuals
            fitpop = np.array([self.fitness_function(ind=ind) for ind in pop])
            # fitness is scaled back to 100%
            fitpop /= np.sum(fitpop)
            # 2I parents are selected at random according to each individual's relative fitness score
            parents = np.random.choice(range(self.I), size=2 * self.I, replace=True, p=fitpop).reshape(self.I, 2)
            # Parents are crossed 2 by 2, with each pair producing exactly one offspring
            crossover = np.array([np.random.choice(parents[i, :], size=self.D, replace=True) for i in range(self.I)]).reshape(self.I, self.D)
            embryos = np.array([[pop[crossover[i, d], d] for d in range(self.D)] for i in range(self.I)]).reshape(self.I, self.D)
            # Mutation rate is fixed at 1/D for the sake of this MWE
            mutations = np.random.binomial(size=self.I * self.D, n=1, p=1 / self.D).reshape(self.I, self.D)
            # "Mutated embryos" become fully fledged individuals and replace the parent generation
            pop = (1 - mutations) * embryos + mutations * (1 - embryos)

        # Individuals are aggregated gene-wise, with the average of active and inactive genes creating a ratio
        return pop.mean(axis=0)

    def pga(self):
        '''Multiple parallel genetic algorithms'''
        universes = np.zeros(self.U * self.D).reshape(self.U, self.D)
        if self.run_parallel:
            pool_size = min(mp.cpu_count(), self.U)
            p = mp.Pool(pool_size, initializer=init_pool)
            results = p.starmap(GeneticAlgorithm.sga, zip(repeat(self, self.U)))
            p.close()
            p.join()
            for u in range(self.U):
                universes[u, :] = results[u]
        else:
            for u in range(self.U):
                universes[u, :] = self.sga()

        # Multiple GAs are aggregated in a sort of "mean of means"
        return universes.mean(axis=0)

if __name__ == '__main__':
    # Dummy test to check if the code runs... it doesn't :(
    ga = GeneticAlgorithm(I=10, G=3, D=5, U=10, fitness_function=my_fitness_function, run_parallel=True)
    print(ga.pga())

印刷:

[0.56 0.46 0.38 0.54 0.52]
于 2021-06-19T14:22:08.640 回答