2

目标

我试图让 DEAP 跨火花集群并行化。我已经看到其他用户引用了这一点,因为它允许通过 yarn 轻松地与现有服务器架构紧密集成。我遵循了参考文献中引用的几个在线教程。我有 deap 的工作代码,然后是我试图转换为使用 spark 的代码。同样的错误“无法在模块 deap.creator 上获取属性个人”是通常发生的错误。

工作代码

import numpy as np
import random

from deap import base
from deap import creator
from deap import tools
from deap import algorithms

creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)

def evalOneMax(individual):
    return sum(individual),

toolbox = base.Toolbox()
toolbox.register("attr_bool", random.randint, 0, 1)
toolbox.register("individual", tools.initRepeat, creator.Individual,
    toolbox.attr_bool, 100)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("evaluate", evalOneMax)
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
toolbox.register("select", tools.selTournament, tournsize=3)

# Define parallelism outside main
if __name__=="__main__":

    pop = toolbox.population(n=300)
    hof = tools.HallOfFame(1)
    stats = tools.Statistics(lambda ind: ind.fitness.values)
    stats.register("avg", np.mean)
    stats.register("std", np.std)
    stats.register("min", np.min)
    stats.register("max", np.max)

    pop, log = algorithms.eaSimple(pop, toolbox, cxpb=0.5, mutpb=0.2, ngen=40,
                                   stats=stats, halloffame=hof, verbose=True)

冉:

python3 test-deap.py

输出

gen nevals  avg     std     min max
0   300     50.5533 4.78893 38  63
1   184     54.2433 3.85627 44  65
2   191     57.2667 3.38756 47  65
3   182     60.0333 3.15154 51  69
4   179     62.35   2.95762 53  71
5   176     64.3267 2.77968 55  73
6   183     66.2467 2.80223 58  75
7   182     68.1733 2.66019 59  79
8   186     69.8367 2.83255 62  77
9   179     71.7233 2.70557 59  78
10  187     73.3667 2.63165 63  80
11  169     74.7933 2.43255 65  80
12  182     76.0967 2.42363 65  82
13  204     77.3    2.36995 67  83
14  203     78.6267 2.31818 70  84
15  182     79.8933 2.29535 72  84
16  183     81.02   2.29483 72  86
17  185     81.87   2.41242 73  87
18  190     83.0633 2.13057 74  87
19  182     84.06   2.16096 75  89
20  194     84.8167 2.41724 77  91
21  174     85.9633 2.22755 79  91
22  180     86.8033 2.23263 79  92
23  177     87.7533 2.3831  78  93
24  174     88.61   2.34334 79  93
25  171     89.6167 2.36144 78  95
26  195     90.57   2.4695  81  95
27  169     91.5233 2.23072 82  96
28  173     92.3733 2.16347 83  97
29  203     93.1    2.13151 85  97
30  179     93.6067 2.41356 84  98
31  169     94.3067 2.23293 86  99
32  184     94.8933 2.49706 85  99
33  175     95.8733 2.14413 88  99
34  168     96.2167 2.30428 88  99
35  173     96.88   2.22537 87  100
36  171     97.33   2.29951 87  100
37  184     97.89   2.05375 91  100
38  175     98.0333 2.52565 88  100
39  176     98.6667 2.07579 90  100
40  175     98.6867 2.32562 91  100

不工作的代码

from pyspark import SparkContext

import numpy as np
import random

from deap import base
from deap import creator
from deap import tools
from deap import algorithms

creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)

def evalOneMax(individual):
    return sum(individual),

toolbox = base.Toolbox()
toolbox.register("attr_bool", random.randint, 0, 1)
toolbox.register("individual", tools.initRepeat, creator.Individual,
    toolbox.attr_bool, 100)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("evaluate", evalOneMax)
toolbox.register("mate", tools.cxTwoPoint)
toolbox.register("mutate", tools.mutFlipBit, indpb=0.05)
toolbox.register("select", tools.selTournament, tournsize=3)

# Define parallelism outside main
if __name__=="__main__":
    sc = SparkContext(appName="DEAP")

    def sparkMap(algorithm, population):
        return sc.parallelize(population).map(algorithm).collect()

    toolbox.register("map", sparkMap)

    pop = toolbox.population(n=300)
    hof = tools.HallOfFame(1)
    stats = tools.Statistics(lambda ind: ind.fitness.values)
    stats.register("avg", np.mean)
    stats.register("std", np.std)
    stats.register("min", np.min)
    stats.register("max", np.max)

    pop, log = algorithms.eaSimple(pop, toolbox, cxpb=0.5, mutpb=0.2, ngen=40,
                                   stats=stats, halloffame=hof, verbose=True)

冉:

spark-submit --master local test-deap.py

输出

全文(粘贴)

强调:

Traceback (most recent call last):
  File "/Users/ryapeach/Documents/Workspace/relay-death/test-deap.py", line 45, in <module>
    stats=stats, halloffame=hof, verbose=True)
  File "/usr/local/lib/python3.6/site-packages/deap-1.2.2-py3.6-macosx-10.13-x86_64.egg/deap/algorithms.py", line 150, in eaSimple
    fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)
  File "/Users/ryapeach/Documents/Workspace/relay-death/test-deap.py", line 32, in sparkMap
    return sc.parallelize(population).map(algorithm).collect()
AttributeError: Can't get attribute 'Individual' on <module 'deap.creator' from '/usr/local/lib/python3.6/site-packages/deap-1.2.2-py3.6-macosx-10.13-x86_64.egg/deap/creator.py'>

参考:

4

0 回答 0