我们发现了一个问题,如果一组共享 BalancingDispatcher 的 Actor 没有收到其他消息,他们就会停止接收 ReceiveTimeout 消息。起初,组中的每个 Actor 都按预期收到 ReceiveTimeout,但获得它们的 Actor 数量很快就会下降,直到只有一个 Actor 获得它。

如果有其他消息进来,它似乎可以防止这种情况发生。只有当 Actor 循环一段时间后,除了 ReceiveTimeout 之外什么都没有。


// Akka version 2.1.1, Scala version 2.10.0

import akka.actor._
import scala.concurrent.duration._
import scala.collection.mutable

case class TimeoutReceived(actor: String, timestamp: Long)

class TimeoutPool(system: ActorSystem) {
  val timeouts = new mutable.MutableList[TimeoutReceived]

  class TimeoutActor extends Actor {
    override def preStart() = {
      context.setReceiveTimeout(100 milliseconds)
    def receive = {
      case ReceiveTimeout =>
        println(System.currentTimeMillis() + " ReceiveTimeout " + self.path)
        timeouts += TimeoutReceived(self.path.name, System.currentTimeMillis())

  val actors: Iterable[ActorRef] =
    for (x <- (0 to 9).toList) yield {
      system.actorOf(Props(() => new TimeoutActor, "dispatcher"),
                     "example.actor" + x)

因此,您可以使用类似的配置启动一个"dispatcher.type = BalancingDispatcher"并观察 println 输出。不久之后,只有一个 Actor 会发出“ReceiveTimeout”输出。

这是一个演示问题的测试类,并且还显示了 Actors 没有被关闭:

import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import org.scalatest.FunSpec
import org.scalatest.matchers.ShouldMatchers

class ExampleSpec extends FunSpec with ShouldMatchers {
  describe("Balancing dispatcher example") {
    it("actors sharing dispatcher stop getting ReceiveTimeouts (except for one)") {
      val system = ActorSystem("TimeoutExample", ConfigFactory.parseString("dispatcher.type = BalancingDispatcher"))
      val pool = new TimeoutPool(system)
      // spin until we've accumulated 50 timeouts
      while(pool.timeouts.length < (50)) { Thread.sleep(500) }
      // grab the last cycle of ten that we recorded
      val lastTenTimeouts = pool.timeouts.toList.drop(40).take(10).map(_.actor.takeRight(1))

      // have the actors shut down?  No:
      pool.actors.forall(_.isTerminated == false) should be (true) // will pass

      // did each of the 10 actors get a timeout in the last cycle?
      lastTenTimeouts.distinct should have size(10) // will fail with size 1 or 2.



这是 Akka 中的错误,还是出于某种原因将 ReceiveTimeouts 与 BalancingDispatcher 一起使用是无效的?


在 Akka 中,共享一个 BalancingDispatcher 的所有参与者也将共享一个邮箱,这可能导致您在上面描述的情况。

ScalaDoc 中所述的 BalancingDispatcher



