我们发现了一个问题,如果一组共享 BalancingDispatcher 的 Actor 没有收到其他消息,他们就会停止接收 ReceiveTimeout 消息。起初,组中的每个 Actor 都按预期收到 ReceiveTimeout,但获得它们的 Actor 数量很快就会下降,直到只有一个 Actor 获得它。
如果有其他消息进来,它似乎可以防止这种情况发生。只有当 Actor 循环一段时间后,除了 ReceiveTimeout 之外什么都没有。
这是一个例子:
// Akka version 2.1.1, Scala version 2.10.0
import akka.actor._
import scala.concurrent.duration._
import scala.collection.mutable
case class TimeoutReceived(actor: String, timestamp: Long)
class TimeoutPool(system: ActorSystem) {
val timeouts = new mutable.MutableList[TimeoutReceived]
class TimeoutActor extends Actor {
override def preStart() = {
super.preStart()
context.setReceiveTimeout(100 milliseconds)
}
def receive = {
case ReceiveTimeout =>
println(System.currentTimeMillis() + " ReceiveTimeout " + self.path)
timeouts += TimeoutReceived(self.path.name, System.currentTimeMillis())
}
}
val actors: Iterable[ActorRef] =
for (x <- (0 to 9).toList) yield {
system.actorOf(Props(() => new TimeoutActor, "dispatcher"),
"example.actor" + x)
}
}
因此,您可以使用类似的配置启动一个"dispatcher.type = BalancingDispatcher"
并观察 println 输出。不久之后,只有一个 Actor 会发出“ReceiveTimeout”输出。
这是一个演示问题的测试类,并且还显示了 Actors 没有被关闭:
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import org.scalatest.FunSpec
import org.scalatest.matchers.ShouldMatchers
class ExampleSpec extends FunSpec with ShouldMatchers {
describe("Balancing dispatcher example") {
it("actors sharing dispatcher stop getting ReceiveTimeouts (except for one)") {
val system = ActorSystem("TimeoutExample", ConfigFactory.parseString("dispatcher.type = BalancingDispatcher"))
val pool = new TimeoutPool(system)
// spin until we've accumulated 50 timeouts
while(pool.timeouts.length < (50)) { Thread.sleep(500) }
// grab the last cycle of ten that we recorded
val lastTenTimeouts = pool.timeouts.toList.drop(40).take(10).map(_.actor.takeRight(1))
// have the actors shut down? No:
pool.actors.forall(_.isTerminated == false) should be (true) // will pass
// did each of the 10 actors get a timeout in the last cycle?
lastTenTimeouts.distinct should have size(10) // will fail with size 1 or 2.
system.shutdown()
}
}
}
将“BalancingDispatcher”更改为“Dispatcher”,测试将通过。
这是 Akka 中的错误,还是出于某种原因将 ReceiveTimeouts 与 BalancingDispatcher 一起使用是无效的?