你观察到的是预期的行为——当然它是非常糟糕的。存在已知的解决方案和最佳实践来防范它,这很好。在这个答案中,我想花一些时间来简短地、长地、然后深入地解释这个问题——阅读愉快!
简短的回答:“不要阻塞路由基础设施! ”,总是使用专门的调度程序来阻塞操作!
观察到的症状的原因:问题是您将context.dispatcher
阻塞期货用作执行的调度程序。路由基础设施使用同一个调度程序(简单来说只是“一堆线程”)来实际处理传入的请求 - 因此,如果您阻塞所有可用线程,最终会导致路由基础设施挨饿。(有争议和基准测试的事情是如果 Akka HTTP 可以保护它,我会将它添加到我的研究待办事项列表中)。
阻塞必须特别小心,以免影响同一调度程序的其他用户(这就是为什么我们将执行分离到不同的用户变得如此简单的原因),如 Akka 文档部分所述:阻塞需要仔细管理。
我想在这里提请注意的另一件事是,如果可能的话,应该完全避免阻塞 API - 如果您的长时间运行的操作不是真正的一个操作,而是一系列操作,您可以将它们分离到不同的参与者或排序期货。无论如何,只是想指出——如果可能的话,避免这种阻塞调用,但如果你必须这样做——那么下面将解释如何正确处理这些调用。
深入分析及解决方案:
现在我们知道什么是错误的,从概念上讲,让我们看看上面的代码中到底有什么问题,以及这个问题的正确解决方案是怎样的:
颜色 = 线程状态:
- 绿松石 – 睡眠
- 橙色 - 等待
- 绿色 - 可运行
现在让我们研究 3 段代码以及它们如何影响调度程序和应用程序的性能。为了强制执行此行为,应用程序已置于以下负载下:
- [a] 继续请求 GET 请求(请参阅上面最初问题中的代码),它不会在那里阻塞
- [b] 然后在一段时间后触发 2000 个 POST 请求,这将导致 5 秒阻塞,然后返回未来
1) [bad]
错误代码上的调度程序行为:
// BAD! (due to the blocking in Future):
implicit val defaultDispatcher = system.dispatcher
val routes: Route = post {
complete {
Future { // uses defaultDispatcher
Thread.sleep(5000) // will block on the default dispatcher,
System.currentTimeMillis().toString // starving the routing infra
}
}
}
因此,我们将我们的应用程序暴露给 [a] 负载,您可以看到已经有许多 akka.actor.default-dispatcher 线程——它们正在处理请求——绿色的小片段,橙色表示其他线程实际上是空闲的。
然后我们启动 [b] 加载,这会导致这些线程阻塞——你可以看到一个早期的线程“default-dispatcher-2,3,4”在之前空闲后进入阻塞状态。我们还观察到池在增长——新线程启动“default-dispatcher-18,19,20,21...”但是它们立即进入睡眠状态(!)——我们在这里浪费了宝贵的资源!
此类启动线程的数量取决于默认调度程序配置,但可能不会超过 50 左右。由于我们刚刚触发了 2k 个阻塞操作,我们使整个线程池处于饥饿状态——阻塞操作占主导地位,以至于路由基础设施没有线程可用于处理其他请求——非常糟糕!
让我们做点什么(顺便说一句,这是 Akka 的最佳实践——始终隔离阻塞行为,如下所示):
2) [good!]
调度程序行为良好的结构化代码/调度程序:
在您application.conf
配置此调度程序专用于阻止行为:
my-blocking-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
// in Akka previous to 2.4.2:
core-pool-size-min = 16
core-pool-size-max = 16
max-pool-size-min = 16
max-pool-size-max = 16
// or in Akka 2.4.2+
fixed-pool-size = 16
}
throughput = 100
}
您应该阅读Akka Dispatchers文档中的更多内容,以了解此处的各种选项。但主要的一点是,我们选择了一个ThreadPoolExecutor
具有硬性限制的线程,它保持可用于阻塞操作。大小设置取决于您的应用程序做什么,以及您的服务器有多少核心。
接下来我们需要使用它,而不是默认的:
// GOOD (due to the blocking in Future):
implicit val blockingDispatcher = system.dispatchers.lookup("my-blocking-dispatcher")
val routes: Route = post {
complete {
Future { // uses the good "blocking dispatcher" that we configured,
// instead of the default dispatcher – the blocking is isolated.
Thread.sleep(5000)
System.currentTimeMillis().toString
}
}
}
我们使用相同的负载对应用程序施加压力,首先是一些正常请求,然后我们添加阻塞请求。这就是 ThreadPools 在这种情况下的行为方式:
因此,最初的正常请求很容易由默认调度程序处理,您可以在那里看到几条绿线 - 那是实际执行(我并没有真正将服务器置于重负载下,所以它大部分是空闲的)。
现在,当我们开始发出阻塞操作时,my-blocking-dispatcher-*
启动并启动到配置的线程数。它处理那里的所有睡眠。此外,在这些线程上没有任何事情发生一段时间后,它会关闭它们。如果我们用另一堆阻塞来攻击服务器,池将启动新线程来处理它们的 sleep()-ing,但与此同时——我们不会浪费我们宝贵的线程在“只是呆在那里并且没做什么”。
使用此设置时,正常 GET 请求的吞吐量不会受到影响,它们仍然很高兴地在(仍然非常免费的)默认调度程序上提供服务。
这是处理反应式应用程序中任何类型阻塞的推荐方法。它通常被称为“屏蔽”(或“隔离”)应用程序的不良行为部分,在这种情况下,不良行为是休眠/阻塞。
3) [workaround-ish]
blocking
正确应用时的调度程序行为:
在此示例中,我们使用scaladoc forscala.concurrent.blocking
方法,该方法可以在遇到阻塞操作时提供帮助。它通常会导致更多线程被启动以在阻塞操作中幸存下来。
// OK, default dispatcher but we'll use `blocking`
implicit val dispatcher = system.dispatcher
val routes: Route = post {
complete {
Future { // uses the default dispatcher (it's a Fork-Join Pool)
blocking { // will cause much more threads to be spun-up, avoiding starvation somewhat,
// but at the cost of exploding the number of threads (which eventually
// may also lead to starvation problems, but on a different layer)
Thread.sleep(5000)
System.currentTimeMillis().toString
}
}
}
}
该应用程序的行为将如下所示:
您会注意到创建了很多新线程,这是因为阻塞提示“哦,这将是阻塞的,所以我们需要更多线程”。这导致我们被阻塞的总时间小于 1) 示例中的时间,但是在阻塞操作完成后我们有数百个线程什么都不做......当然,它们最终会被关闭(FJP 会这样做),但在一段时间内,我们将运行大量(不受控制的)线程,与 2) 解决方案相比,我们确切地知道有多少线程专门用于阻塞行为。
总结:永远不要阻止默认调度程序:-)
最佳实践是使用中所示的模式2)
,为可用的阻塞操作提供一个调度程序,并在那里执行它们。
讨论了 Akka HTTP 版本:2.0.1
使用的分析器:很多人私下问我这个答案,我用什么分析器来可视化上面图片中的线程状态,所以在这里添加这个信息:我使用了YourKit,它是一个很棒的商业分析器(OSS 免费),不过您可以使用来自 OpenJDK的免费 VisualVM 获得相同的结果。