0

我在 Akka 远程处理中面临性能问题。我有 2 个演员Actor1Actor2. Actor 之间发送的消息是来自to的同步请求请求和来自Actor1toActor2的响应。下面是我的 Actor 的示例代码片段和配置:Actor2Actor1

Actor1.java:

object Actor1 extends App {
  val conf = ConfigFactory.load()
  val system = ActorSystem("testSystem1", conf.getConfig("remote1"))
  val actor = system.actorOf(Props[Actor1].withDispatcher("my-dispatcher"), "actor1")
  implicit val timeOut: Timeout = Timeout(10 seconds)

  class Actor1 extends Actor {
    var value = 0
    var actorRef: ActorRef = null

    override def preStart(): Unit = {
      println(self.path)
    }

    override def receive: Receive = {
      case "register" =>
        actorRef = sender()
        println("Registering the actor")
        val time = System.currentTimeMillis()
        (1 to 300000).foreach(value => {
          if (value % 10000 == 0) {
            println("message count -- " + value + " --- time taken - " + (System.currentTimeMillis() - time))
          }
          Await.result(actorRef ? value, 10 seconds)
        })
        val totalTime = System.currentTimeMillis() - time
        println("Total Time - " + totalTime)
    }
  }

}

Actor2.java:

object Actor2 extends App {
  val conf = ConfigFactory.load()
  val system = ActorSystem("testSystem1", conf.getConfig("remote2"))
  val actor = system.actorOf(Props[Actor2].withDispatcher("my-dispatcher"), "actor2")
  implicit val timeOut: Timeout = Timeout(10 seconds)
  actor ! "send"


  class Actor2 extends Actor {
    var value = 0
    var actorSelection: ActorSelection = context.actorSelection("akka://testSystem1@127.0.0.1:6061/user/actor1")

    override def receive: Receive = {
      case "send" =>
        actorSelection ! "register"
      case int: Int => {
        sender() ! 1
      }
    }
  }


}

应用程序.conf:

remote1 {
  my-dispatcher {
    executor = "thread-pool-executor"
    type = PinnedDispatcher
  }
  akka {
    actor {
      provider = remote
    }
    remote {
      artery {
        transport = tcp # See Selecting a transport below
        canonical.hostname = "127.0.0.1"
        canonical.port = 6061
      }
    }
  }
}

remote2 {
  my-dispatcher {
    executor = "thread-pool-executor"
    type = PinnedDispatcher
  }
  akka {
    actor {
      provider = remote
    }
    remote {
      artery {
        transport = tcp # See Selecting a transport below
        canonical.hostname = "127.0.0.1"
        canonical.port = 6062
      }
    }
  }
}

输出:

message count -- 10000 --- time taken - 5871
message count -- 20000 --- time taken - 9043
message count -- 30000 --- time taken - 12198
message count -- 40000 --- time taken - 15363
message count -- 50000 --- time taken - 18649
message count -- 60000 --- time taken - 22074
message count -- 70000 --- time taken - 25487
message count -- 80000 --- time taken - 28820
message count -- 90000 --- time taken - 32118
message count -- 100000 --- time taken - 35634
message count -- 110000 --- time taken - 39146
message count -- 120000 --- time taken - 42539
message count -- 130000 --- time taken - 45997
message count -- 140000 --- time taken - 50013
message count -- 150000 --- time taken - 53466
message count -- 160000 --- time taken - 57117
message count -- 170000 --- time taken - 61246
message count -- 180000 --- time taken - 65051
message count -- 190000 --- time taken - 68809
message count -- 200000 --- time taken - 72908
message count -- 210000 --- time taken - 77091
message count -- 220000 --- time taken - 80855
message count -- 230000 --- time taken - 84679
message count -- 240000 --- time taken - 89089
message count -- 250000 --- time taken - 93132
message count -- 260000 --- time taken - 97360
message count -- 270000 --- time taken - 101442
message count -- 280000 --- time taken - 105656
message count -- 290000 --- time taken - 109665
message count -- 300000 --- time taken - 113706
Total Time - 113707

我在这里做错了什么吗?有什么观察或建议可以提高性能吗?

4

1 回答 1

0

我在代码中看到的主要问题是 Await.result()。这是一个阻塞操作,很可能会影响性能。

我建议将结果收集在一个固定的数组/列表中,使用整数作为数组,并在收到预期数量的响应时认为它完成。

于 2020-10-29T17:40:21.997 回答