1

我对 Scala 的 Akka 非常陌生,我收到一条错误消息,我不明白。我的代码基本上运行的是进化算法。对于在一代人中创造的每个新个体,都必须对其进行评估。对这些人的评估是我想要并行化的(从长远来看,我想在集群上这样做)。为了实现这一点,我已经完全将我的代码导向了 Akka 网站上的入门教程。代码如下所示:

object Configuration {

sealed trait Message
case object Broadcast extends Message
case class Work(inds:ArrayBuffer[Individual]) extends Message
case class Result(result:ArrayBuffer[Individual]) extends Message
case class NewIndividuals(inds:ArrayBuffer[Individual]) extends Message

class Worker extends Actor {

    val id = (Math.random * 100).asInstanceOf[Int]

    def receive = {
        case Work(inds) =>
            sender ! Result(geneticOperation(inds)) // perform the work
    }

    private def geneticOperation(inds:ArrayBuffer[Individual]) = {
        for(child <- inds) {
            child.mutate
            child.evaluate
            println(id + ": I've evaluated one.")
        }
        println("\n")
        inds
    }

}

class Master(nrOfWorkers:Int, nrOfMessages:Int, oldInds:ArrayBuffer[Individual], listener:ActorRef)
    extends Actor {

    var inds:ArrayBuffer[Individual] = _ 
    var nrOfResults:Int = _
    val start:Long = System.currentTimeMillis

    val workerRouter = context.actorOf(
            Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")

    def receive = {
        case Broadcast =>
            var workOn = new ArrayBuffer[Individual]    
            for(i <- 0 until nrOfMessages)
                workOn += oldInds(i)
            oldInds.trimStart(nrOfMessages)
            workerRouter ! Work(workOn)
        case Result(newInds) =>
            inds ++= newInds
            nrOfResults += 1
            println(nrOfResults)
            if(nrOfResults == nrOfMessages) {
                listener ! NewIndividuals(inds)
                context.stop(self)
            }
    }

}

class Listener extends Actor {
    def receive = {
        case NewIndividuals(inds) =>
            Configuration.environment(inds)
            context.system.shutdown()
    }
}

var d = Parameter.N_DROPLET_TYPE-2

  /** Apply genetic operators (recombination, mutation) on the population 
   * during the current generation.
   *  @param g Current generation.
   */
def apply(g:Int) {
    var children = Population.recombinate   // create offspring
    calculate(Parameter.NODES,(Parameter.L / Parameter.NODES),children)
}

def calculate(nrOfWorkers:Int, nrOfMessages:Int, inds:ArrayBuffer[Individual]) {
    val system = ActorSystem("GeneticOperations")
    val listener = system.actorOf(Props[Listener], name="listener")
    val master = system.actorOf(Props(new Master(
            nrOfWorkers,nrOfMessages,inds,listener)),
            name="master")
    master ! Broadcast
}

    private def environment(children:ArrayBuffer[Individual]) {
      if(Parameter.NEUTRAL) {       // if neutral evolution no selection pressure
          Population.neutralEvolution(children)
          writeDiversity
      }
      else Population.select(children)
      writeAllToEvoPath
}

当我运行它时,我得到以下输出:

ID akka
Population setup complete.
Evolution: Gen 1
Gen 2
Gen 3
Gen 4
Gen 5
Gen 6
Gen 7
Gen 8
Gen 9


107 seconds elapsed.
79: I've evaluated one.
67: I've evaluated one.
89: I've evaluated one.
56: I've evaluated one.
17: I've evaluated one.
55: I've evaluated one.
77: I've evaluated one.
15: I've evaluated one.
57: I've evaluated one.
79: I've evaluated one.


[ERROR] [05/03/2012 12:15:01.864] [GeneticOperations-akka.actor.default-dispatcher-6] [akka://GeneticOperations/user/master] null
java.lang.NullPointerException
at framework.Configuration$Master$$anonfun$receive$2.apply(Configuration.scala:75)
at framework.Configuration$Master$$anonfun$receive$2.apply(Configuration.scala:67)
at akka.actor.Actor$class.apply(Actor.scala:311)
at framework.Configuration$Master.apply(Configuration.scala:57)
at akka.actor.ActorCell.invoke(ActorCell.scala:619)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:196)
at akka.dispatch.Mailbox.run(Mailbox.scala:178)
at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:505)
at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:974)
at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)

55: I've evaluated one.


[ERROR] [05/03/2012 12:15:04.339] [GeneticOperations-akka.actor.default-dispatcher-4] [akka://GeneticOperations/user/master] null
java.lang.NullPointerException
at framework.Configuration$Master$$anonfun$receive$2.apply(Configuration.scala:75)
at framework.Configuration$Master$$anonfun$receive$2.apply(Configuration.scala:67)
at akka.actor.Actor$class.apply(Actor.scala:311)
at framework.Configuration$Master.apply(Configuration.scala:57)
at akka.actor.ActorCell.invoke(ActorCell.scala:619)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:196)
at akka.dispatch.Mailbox.run(Mailbox.scala:178)
at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:505)
at akka.jsr166y.ForkJoinTask.doExec(ForkJoinTask.java:259)
at akka.jsr166y.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:974)
at akka.jsr166y.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at akka.jsr166y.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)

(等等)

对于每一代(gen),它应该进行评估,但相反,我的代码似乎没有等待 Master Actor 获得结果,而是继续进行。错误消息在我看来就像 Master 上有 NullPointerException?但我不明白为什么会发生这种情况。Akka 教程声称它会等到 Master 收到来自 Workers 的所有结果(这是我想要的)。

任何有帮助的东西都将不胜感激!(请注意,我真的是 Akka 的初学者。)

更新:

1) framework.Configuration$Master$$anonfun$receive$2.apply(Configuration.scala:75): inds ++= newInds (在Master, receive, case Result)

2)Parameter中的变量只是整数,在这种情况下NODES = 4,L = 8

4

0 回答 0