我对 Scala 的 Akka 非常陌生,我收到一条错误消息,我不明白。我的代码基本上运行的是进化算法。对于在一代人中创造的每个新个体,都必须对其进行评估。对这些人的评估是我想要并行化的(从长远来看,我想在集群上这样做)。为了实现这一点,我已经完全将我的代码导向了 Akka 网站上的入门教程。代码如下所示:
object Configuration {
sealed trait Message
case object Broadcast extends Message
case class Work(inds:ArrayBuffer[Individual]) extends Message
case class Result(result:ArrayBuffer[Individual]) extends Message
case class NewIndividuals(inds:ArrayBuffer[Individual]) extends Message
class Worker extends Actor {
val id = (Math.random * 100).asInstanceOf[Int]
def receive = {
case Work(inds) =>
sender ! Result(geneticOperation(inds)) // perform the work
}
private def geneticOperation(inds:ArrayBuffer[Individual]) = {
for(child <- inds) {
child.mutate
child.evaluate
println(id + ": I've evaluated one.")
}
println("\n")
inds
}
}
class Master(nrOfWorkers:Int, nrOfMessages:Int, oldInds:ArrayBuffer[Individual], listener:ActorRef)
extends Actor {
var inds:ArrayBuffer[Individual] = _
var nrOfResults:Int = _
val start:Long = System.currentTimeMillis
val workerRouter = context.actorOf(
Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")
def receive = {
case Broadcast =>
var workOn = new ArrayBuffer[Individual]
for(i <- 0 until nrOfMessages)
workOn += oldInds(i)
oldInds.trimStart(nrOfMessages)
workerRouter ! Work(workOn)
case Result(newInds) =>
inds ++= newInds
nrOfResults += 1
println(nrOfResults)
if(nrOfResults == nrOfMessages) {
listener ! NewIndividuals(inds)
context.stop(self)
}
}
}
class Listener extends Actor {
def receive = {
case NewIndividuals(inds) =>
Configuration.environment(inds)
context.system.shutdown()
}
}
var d = Parameter.N_DROPLET_TYPE-2
/** Apply genetic operators (recombination, mutation) on the population
* during the current generation.
* @param g Current generation.
*/
def apply(g:Int) {
var children = Population.recombinate // create offspring
calculate(Parameter.NODES,(Parameter.L / Parameter.NODES),children)
}
def calculate(nrOfWorkers:Int, nrOfMessages:Int, inds:ArrayBuffer[Individual]) {
val system = ActorSystem("GeneticOperations")
val listener = system.actorOf(Props[Listener], name="listener")
val master = system.actorOf(Props(new Master(
nrOfWorkers,nrOfMessages,inds,listener)),
name="master")
master ! Broadcast
}
private def environment(children:ArrayBuffer[Individual]) {
if(Parameter.NEUTRAL) { // if neutral evolution no selection pressure
Population.neutralEvolution(children)
writeDiversity
}
else Population.select(children)
writeAllToEvoPath
}
当我运行它时,我得到以下输出:
ID akka
Population setup complete.
Evolution: Gen 1
Gen 2
Gen 3
Gen 4
Gen 5
Gen 6
Gen 7
Gen 8
Gen 9
107 seconds elapsed.
79: I've evaluated one.
67: I've evaluated one.
89: I've evaluated one.
56: I've evaluated one.
17: I've evaluated one.
55: I've evaluated one.
77: I've evaluated one.
15: I've evaluated one.
57: I've evaluated one.
79: I've evaluated one.
[ERROR] [05/03/2012 12:15:01.864] [GeneticOperations-akka.actor.default-dispatcher-6] [akka://GeneticOperations/user/master] null
java.lang.NullPointerException
at framework.Configuration$Master$$anonfun$receive$2.apply(Configuration.scala:75)
at framework.Configuration$Master$$anonfun$receive$2.apply(Configuration.scala:67)
at akka.actor.Actor$class.apply(Actor.scala:311)
at framework.Configuration$Master.apply(Configuration.scala:57)
at akka.actor.ActorCell.invoke(ActorCell.scala:619)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:196)
at akka.dispatch.Mailbox.run(Mailbox.scala:178)
at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:505)
at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:974)
at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
55: I've evaluated one.
[ERROR] [05/03/2012 12:15:04.339] [GeneticOperations-akka.actor.default-dispatcher-4] [akka://GeneticOperations/user/master] null
java.lang.NullPointerException
at framework.Configuration$Master$$anonfun$receive$2.apply(Configuration.scala:75)
at framework.Configuration$Master$$anonfun$receive$2.apply(Configuration.scala:67)
at akka.actor.Actor$class.apply(Actor.scala:311)
at framework.Configuration$Master.apply(Configuration.scala:57)
at akka.actor.ActorCell.invoke(ActorCell.scala:619)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:196)
at akka.dispatch.Mailbox.run(Mailbox.scala:178)
at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:505)
at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:974)
at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
(等等)
对于每一代(gen),它应该进行评估,但相反,我的代码似乎没有等待 Master Actor 获得结果,而是继续进行。错误消息在我看来就像 Master 上有 NullPointerException?但我不明白为什么会发生这种情况。Akka 教程声称它会等到 Master 收到来自 Workers 的所有结果(这是我想要的)。
任何有帮助的东西都将不胜感激!(请注意,我真的是 Akka 的初学者。)
更新:
1) framework.Configuration$Master$$anonfun$receive$2.apply(Configuration.scala:75): inds ++= newInds (在Master, receive, case Result)
2)Parameter中的变量只是整数,在这种情况下NODES = 4,L = 8