5

我正在使用 Scala 和 Akka 进行人工生命模拟,到目前为止,我对两者都非常满意。我在时间方面遇到了一些问题,但是我无法完全解释。

目前,我模拟中的每只动物都是一对演员(动物+大脑)。通常,这两个参与者轮流进行(动物将传感器输入发送到大脑,等待结果,对其采取行动并重新开始)。然而,动物时不时地需要相互互动才能互相吃掉或繁殖。

对我来说奇怪的一件事是时机。事实证明,从一种动物向另一种动物发送信息比从动物向大脑发送信息要慢很多(大约 100 倍)。与素食者和无性动物相比,这使我可怜的捕食者和性活跃的动物处于不利地位(免责声明:我自己也是素食主义者,但我认为成为素食者有更好的理由,而不是在尝试打猎时陷入困境。 .)。

我提取了一个演示问题的最小代码片段:

package edu.blindworld.test

import java.util.concurrent.TimeUnit

import akka.actor.{ActorRef, ActorSystem, Props, Actor}
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.util.Random

class Animal extends Actor {
  val brain = context.actorOf(Props(classOf[Brain]))
  var animals: Option[List[ActorRef]] = None

  var brainCount = 0
  var brainRequestStartTime = 0L
  var brainNanos = 0L

  var peerCount = 0
  var peerRequestStartTime = 0L
  var peerNanos = 0L

  override def receive = {
    case Go(all) =>
      animals = Some(all)
      performLoop()
    case BrainResponse =>
      brainNanos += (System.nanoTime() - brainRequestStartTime)
      brainCount += 1
      // Animal interactions are rare
      if (Random.nextDouble() < 0.01) {
        // Send a ping to a random other one (or ourselves). Defer our own loop
        val randomOther = animals.get(Random.nextInt(animals.get.length))
        peerRequestStartTime = System.nanoTime()
        randomOther ! PeerRequest
      } else {
        performLoop()
      }
    case PeerResponse =>
      peerNanos += (System.nanoTime() - peerRequestStartTime)
      peerCount += 1
      performLoop()
    case PeerRequest =>
      sender() ! PeerResponse
    case Stop =>
      sender() ! StopResult(brainCount, brainNanos, peerCount, peerNanos)
      context.stop(brain)
      context.stop(self)
  }

  def performLoop() = {
    brain ! BrainRequest
    brainRequestStartTime = System.nanoTime()
  }
}

class Brain extends Actor {
  override def receive = {
    case BrainRequest =>
      sender() ! BrainResponse
  }
}

case class Go(animals: List[ActorRef])
case object Stop
case class StopResult(brainCount: Int, brainNanos: Long, peerCount: Int, peerNanos: Long)

case object BrainRequest
case object BrainResponse

case object PeerRequest
case object PeerResponse

object ActorTest extends App {
  println("Sampling...")
  val system = ActorSystem("Test")
  val animals = (0 until 50).map(i => system.actorOf(Props(classOf[Animal]))).toList
  animals.foreach(_ ! Go(animals))
  Thread.sleep(5000)
  implicit val timeout = Timeout(5, TimeUnit.SECONDS)
  val futureStats = animals.map(_.ask(Stop).mapTo[StopResult])
  val stats = futureStats.map(Await.result(_, Duration(5, TimeUnit.SECONDS)))
  val brainCount = stats.foldLeft(0)(_ + _.brainCount)
  val brainNanos = stats.foldLeft(0L)(_ + _.brainNanos)
  val peerCount = stats.foldLeft(0)(_ + _.peerCount)
  val peerNanos = stats.foldLeft(0L)(_ + _.peerNanos)
  println("Average time for brain request: " + (brainNanos / brainCount) / 1000000.0 + "ms (sampled from " + brainCount + " requests)")
  println("Average time for peer pings: " + (peerNanos / peerCount) / 1000000.0 + "ms (sampled from " + peerCount + " requests)")
  system.shutdown()
}

这就是这里发生的事情:

  • 我正在创造 50 对动物/大脑演员
  • 它们都启动并运行 5 秒
  • 每只动物做一个无限循环,轮流用它的大脑
  • 在所有运行的 1% 中,一只动物向随机的其他动物发送 ping 并等待其回复。然后,它用它的大脑继续它的循环
  • 对大脑和对等节点的每个请求都会被测量,这样我们就可以得到一个平均值
  • 5 秒后,一切都停止,并比较大脑请求和对同伴的 ping 时间

在我的双核 i7 上,我看到了这些数字:

大脑请求的平均时间:0.004708ms(从 21073859 个请求中采样)

对等 ping 的平均时间:0.66866 毫秒(从 211167 个请求中采样)

因此,对同伴的 ping 比对大脑的请求慢 165 倍。我一直在尝试很多事情来解决这个问题(例如优先邮箱和预热 JIT),但无法弄清楚发生了什么。有人有想法吗?

4

1 回答 1

0

我认为您应该使用询问模式来处理消息。在您的代码中,BrainRequest 被发送到大脑 actor,然后它发送回 BrainResponse。问题就在这里。BrainResponse 不是 BrainRequest 的响应。也许这是之前 BrainRequest 的回应。

以下代码使用 ask 模式,性能结果几乎相同。

package edu.blindworld.test

import java.util.concurrent.TimeUnit

import akka.actor.{ActorRef, ActorSystem, Props, Actor}
import akka.pattern.ask
import akka.util.Timeout

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Random

class Animal extends Actor {
  val brain = context.actorOf(Props(classOf[Brain]))
  var animals: Option[List[ActorRef]] = None

  var brainCount = 0
  var brainRequestStartTime = 0L
  var brainNanos = 0L

  var peerCount = 0
  var peerRequestStartTime = 0L
  var peerNanos = 0L

  override def receive = {
    case Go(all) =>
      animals = Some(all)
      performLoop()
    case PeerRequest =>
      sender() ! PeerResponse
    case Stop =>
      sender() ! StopResult(brainCount, brainNanos, peerCount, peerNanos)
      context.stop(brain)
      context.stop(self)
  }

  def performLoop(): Unit = {
    brainRequestStartTime = System.nanoTime()
    brain.ask(BrainRequest)(10.millis) onSuccess {
      case _ =>
        brainNanos += (System.nanoTime() - brainRequestStartTime)
        brainCount += 1
        // Animal interactions are rare
        if (Random.nextDouble() < 0.01) {
          // Send a ping to a random other one (or ourselves). Defer our own loop
          val randomOther = animals.get(Random.nextInt(animals.get.length))
          peerRequestStartTime = System.nanoTime()
          randomOther.ask(PeerRequest)(10.millis) onSuccess {
            case _ =>
              peerNanos += (System.nanoTime() - peerRequestStartTime)
              peerCount += 1
              performLoop()
          }
        } else {
          performLoop()
        }
    }
  }
}

class Brain extends Actor {
  override def receive = {
    case BrainRequest =>
      sender() ! BrainResponse
  }
}

case class Go(animals: List[ActorRef])
case object Stop
case class StopResult(brainCount: Int, brainNanos: Long, peerCount: Int, peerNanos: Long)

case object BrainRequest
case object BrainResponse

case object PeerRequest
case object PeerResponse

object ActorTest extends App {
  println("Sampling...")
  val system = ActorSystem("Test")
  val animals = (0 until 50).map(i => system.actorOf(Props(classOf[Animal]))).toList
  animals.foreach(_ ! Go(animals))
  Thread.sleep(5000)
  implicit val timeout = Timeout(5, TimeUnit.SECONDS)
  val futureStats = animals.map(_.ask(Stop).mapTo[StopResult])
  val stats = futureStats.map(Await.result(_, Duration(5, TimeUnit.SECONDS)))
  val brainCount = stats.foldLeft(0)(_ + _.brainCount)
  val brainNanos = stats.foldLeft(0L)(_ + _.brainNanos)
  val peerCount = stats.foldLeft(0)(_ + _.peerCount)
  val peerNanos = stats.foldLeft(0L)(_ + _.peerNanos)
  println("Average time for brain request: " + (brainNanos / brainCount) / 1000000.0 + "ms (sampled from " + brainCount + " requests)")
  println("Average time for peer pings: " + (peerNanos / peerCount) / 1000000.0 + "ms (sampled from " + peerCount + " requests)")
  system.shutdown()
}
于 2015-05-24T11:20:53.653 回答