我最近正在学习 Akka Actor。我在 Actor 中阅读了调度员的文档。我很好奇演员中的阻塞操作。文档中的最后一个主题描述了如何解决该问题。我正在尝试重现文档中的示例实验。
这是我的代码:
package dispatcher
import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory
object Main extends App{
var config = ConfigFactory.parseString(
"""
|my-dispatcher{
|type = Dispatcher
|
|executor = "fork-join-executor"
|
|fork-join-executor{
|fixed-pool-size = 32
|}
|throughput = 1
|}
""".stripMargin)
// val system = ActorSystem("block", ConfigFactory.load("/Users/jiexray/IdeaProjects/ActorDemo/application.conf"))
val system = ActorSystem("block")
val actor1 = system.actorOf(Props(new BlockingFutureActor()))
val actor2 = system.actorOf(Props(new PrintActor()))
for(i <- 1 to 1000){
actor1 ! i
actor2 ! i
}
}
package dispatcher
import akka.actor.Actor
import scala.concurrent.{ExecutionContext, Future}
class BlockingFutureActor extends Actor{
override def receive: Receive = {
case i: Int =>
Thread.sleep(5000)
implicit val excutionContext: ExecutionContext = context.dispatcher
Future {
Thread.sleep(5000)
println(s"Blocking future finished ${i}")
}
}
}
package dispatcher
import akka.actor.Actor
class PrintActor extends Actor{
override def receive: Receive = {
case i: Int =>
println(s"PrintActor: ${i}")
}
}
我只是使用默认调度程序创建一个ActorSystem
,所有参与者都依赖于这些调度程序。有BlockingFutureActor
一个封装在Future
. 这PrintActor
只是立即打印一个数字。
在文档的解释中,默认的 dispatchers 会被 Future
s占用BlockingFutureActor
,导致 .s 的消息阻塞PrintActor
。该应用程序卡在某个地方,例如:
> PrintActor: 44
> PrintActor: 45
不幸的是,我的代码没有被阻止。所有输出PrintActor
都顺利显示。但是出现的输出BlockingFutureActor
就像挤牙膏一样。我尝试通过 Intellij 的 Debug 监控我的线程信息,我得到:
您可能会发现只有两个调度员在睡觉(BlockingFutureActor
导致这种情况发生)。其他人正在等待,这意味着他们可以进行新消息传递。
我已经阅读了关于 Actor( page )中阻塞操作的答案。引用说“调度程序实际上是线程池。将两者分开可以保证缓慢的阻塞操作不会使另一个饿死。这种方法通常被称为批量标题,因为它的想法是如果应用程序的一部分发生故障,其余部分仍保持响应。”
默认调度程序是否会为阻塞操作保留一些调度程序?这样即使有很多阻塞操作要求调度程序,系统也可以处理消息。
Akka文档中的实验可以复现吗?是不是我的配置有问题。
感谢您的建议。最良好的祝愿。