0

我最近正在学习 Akka Actor。我在 Actor 中阅读了调度员的文档。我很好奇演员中的阻塞操作。文档中的最后一个主题描述了如何解决该问题。我正在尝试重现文档中的示例实验。

这是我的代码:

package dispatcher

import akka.actor.{ActorSystem, Props}
import com.typesafe.config.ConfigFactory

object Main extends App{

  var config = ConfigFactory.parseString(
    """
      |my-dispatcher{
      |type = Dispatcher
      |
      |executor = "fork-join-executor"
      |
      |fork-join-executor{
      |fixed-pool-size = 32
      |}
      |throughput = 1
      |}
    """.stripMargin)

//  val system = ActorSystem("block", ConfigFactory.load("/Users/jiexray/IdeaProjects/ActorDemo/application.conf"))


  val system = ActorSystem("block")


  val actor1 = system.actorOf(Props(new BlockingFutureActor()))
  val actor2 = system.actorOf(Props(new PrintActor()))

  for(i <- 1 to 1000){
    actor1 ! i
    actor2 ! i
  }

}

package dispatcher

import akka.actor.Actor

import scala.concurrent.{ExecutionContext, Future}

class BlockingFutureActor extends Actor{
  override def receive: Receive = {
    case i: Int =>
      Thread.sleep(5000)
      implicit val excutionContext: ExecutionContext = context.dispatcher
      Future {
        Thread.sleep(5000)
        println(s"Blocking future finished ${i}")
      }
  }
}
package dispatcher

import akka.actor.Actor

class PrintActor extends Actor{
  override def receive: Receive = {
    case i: Int =>
      println(s"PrintActor: ${i}")
  }
}

我只是使用默认调度程序创建一个ActorSystem,所有参与者都依赖于这些调度程序。有BlockingFutureActor一个封装在Future. 这PrintActor只是立即打印一个数字。

在文档的解释中,默认的 dispatchers 会被 Futures占用BlockingFutureActor,导致 .s 的消息阻塞PrintActor。该应用程序卡在某个地方,例如:

> PrintActor: 44
> PrintActor: 45

不幸的是,我的代码没有被阻止。所有输出PrintActor都顺利显示。但是出现的输出BlockingFutureActor就像挤牙膏一样。我尝试通过 Intellij 的 Debug 监控我的线程信息,我得到: 线程监控

您可能会发现只有两个调度员在睡觉(BlockingFutureActor导致这种情况发生)。其他人正在等待,这意味着他们可以进行新消息传递。

我已经阅读了关于 Actor( page )中阻塞操作的答案。引用说“调度程序实际上是线程池。将两者分开可以保证缓慢的阻塞操作不会使另一个饿死。这种方法通常被称为批量标题,因为它的想法是如果应用程序的一部分发生故障,其余部分仍保持响应。”

默认调度程序是否会为阻塞操作保留一些调度程序?这样即使有很多阻塞操作要求调度程序,系统也可以处理消息。

Akka文档中的实验可以复现吗?是不是我的配置有问题。

感谢您的建议。最良好的祝愿。

4

1 回答 1

2

PrintActor您在 ' 的任何打印语句之前看到所有 1000 个打印语句的原因BlockingFutureActor是因为'块Thread.sleep中的第一次调用。这是您的代码与官方文档中的示例之间的主要区别:BlockingFutureActorreceiveThread.sleep

override def receive: Receive = {
  case i: Int =>
    Thread.sleep(5000) // <----- this call is not in the example in the official docs
    implicit val excutionContext: ExecutionContext = context.dispatcher
    Future {
      ...
    }
}

请记住,参与者一次处理一条消息。Thread.sleep(5000)基本上模拟一条至少需要五秒钟才能处理的消息。在BlockingFutureActor处理完当前消息之前,它不会处理另一条消息,即使它的邮箱中有数百条消息。在BlockingFutureActor处理第一Int条 value 消息时1PrintActor已经完成了对发送给它的所有 1000 条消息的处理。为了更清楚地说明这一点,让我们添加一个println语句:

override def receive: Receive = {
  case i: Int =>
    println(s"Entering BlockingFutureActor's receive: $i") // <-----
    Thread.sleep(5000)
    implicit val excutionContext: ExecutionContext = context.dispatcher
    Future {
      ...
    }
}

运行程序时的示例输出:

Entering BlockingFutureActor's receive: 1
PrintActor: 1
PrintActor: 2
PrintActor: 3
...
PrintActor: 1000
Entering BlockingFutureActor's receive: 2
Entering BlockingFutureActor's receive: 3
Blocking future finished 1
...

如您所见,当BlockingFutureActor实际开始处理消息2时,PrintActor已经处理了所有 1000 条消息。

如果您先删除它Thread.sleep,那么您会BlockingFutureActor更快地看到从 的邮箱中出列的消息,因为该工作被“委托”给Future. 创建完成Future后,actor 从其邮箱中获取下一条消息,而无需等待Future完成。下面是一个没有第一个的示例输出Thread.sleep(每次运行它都不会完全相同):

Entering BlockingFutureActor's receive: 1
PrintActor: 1
PrintActor: 2
...
PrintActor: 84
PrintActor: 85
Entering BlockingFutureActor's receive: 2
Entering BlockingFutureActor's receive: 3
Entering BlockingFutureActor's receive: 4
Entering BlockingFutureActor's receive: 5
PrintActor: 86
PrintActor: 87
...
于 2017-08-05T13:57:41.910 回答