0

我们发现了一个问题,如果一组共享 BalancingDispatcher 的 Actor 没有收到其他消息,他们就会停止接收 ReceiveTimeout 消息。起初,组中的每个 Actor 都按预期收到 ReceiveTimeout,但获得它们的 Actor 数量很快就会下降,直到只有一个 Actor 获得它。

如果有其他消息进来,它似乎可以防止这种情况发生。只有当 Actor 循环一段时间后,除了 ReceiveTimeout 之外什么都没有。

这是一个例子:

// Akka version 2.1.1, Scala version 2.10.0

import akka.actor._
import scala.concurrent.duration._
import scala.collection.mutable

case class TimeoutReceived(actor: String, timestamp: Long)

class TimeoutPool(system: ActorSystem) {
  val timeouts = new mutable.MutableList[TimeoutReceived]

  class TimeoutActor extends Actor {
    override def preStart() = {
      super.preStart()
      context.setReceiveTimeout(100 milliseconds)
    }
    def receive = {
      case ReceiveTimeout =>
        println(System.currentTimeMillis() + " ReceiveTimeout " + self.path)
        timeouts += TimeoutReceived(self.path.name, System.currentTimeMillis())
    }
  }

  val actors: Iterable[ActorRef] =
    for (x <- (0 to 9).toList) yield {
      system.actorOf(Props(() => new TimeoutActor, "dispatcher"),
                     "example.actor" + x)
    }
}

因此,您可以使用类似的配置启动一个"dispatcher.type = BalancingDispatcher"并观察 println 输出。不久之后,只有一个 Actor 会发出“ReceiveTimeout”输出。

这是一个演示问题的测试类,并且还显示了 Actors 没有被关闭:

import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import org.scalatest.FunSpec
import org.scalatest.matchers.ShouldMatchers

class ExampleSpec extends FunSpec with ShouldMatchers {
  describe("Balancing dispatcher example") {
    it("actors sharing dispatcher stop getting ReceiveTimeouts (except for one)") {
      val system = ActorSystem("TimeoutExample", ConfigFactory.parseString("dispatcher.type = BalancingDispatcher"))
      val pool = new TimeoutPool(system)
      // spin until we've accumulated 50 timeouts
      while(pool.timeouts.length < (50)) { Thread.sleep(500) }
      // grab the last cycle of ten that we recorded
      val lastTenTimeouts = pool.timeouts.toList.drop(40).take(10).map(_.actor.takeRight(1))

      // have the actors shut down?  No:
      pool.actors.forall(_.isTerminated == false) should be (true) // will pass

      // did each of the 10 actors get a timeout in the last cycle?
      lastTenTimeouts.distinct should have size(10) // will fail with size 1 or 2.

      system.shutdown()
    }
  }
}

将“BalancingDispatcher”更改为“Dispatcher”,测试将通过。

这是 Akka 中的错误,还是出于某种原因将 ReceiveTimeouts 与 BalancingDispatcher 一起使用是无效的?

4

1 回答 1

2

在 Akka 中,共享一个 BalancingDispatcher 的所有参与者也将共享一个邮箱,这可能导致您在上面描述的情况。

ScalaDoc 中所述的 BalancingDispatcher

一个基于执行器的事件驱动调度器,它将尝试将工作从忙碌的演员重新分配给空闲的演员。假设使用此调度程序的同一实例的所有参与者都可以处理已发送到其中一个参与者的所有消息。即参与者属于参与者池,并且对于客户端而言,不能保证哪个参与者实例实际处理给定消息。

尽管此实现中使用的技术通常称为“工作窃取”,但实际实现可能最好将其描述为“工作捐赠”,因为工作被窃取的参与者采取了主动。

于 2013-09-17T16:58:39.063 回答