假设我有一个“生产者-消费者”问题:生产者向消费者发送消息,消费者使用 Scala 异步处理它们Futures
:例如future { /* do the processing */ }
.
现在假设生产者每秒产生 100 条消息。但消费者每秒只处理 10 条消息。会发生什么 ?我想会有内存泄漏。会有很多Future
对象,线程池的内部消息队列也会增长。是否有意义 ?
处理它的最佳方法是什么?
假设我有一个“生产者-消费者”问题:生产者向消费者发送消息,消费者使用 Scala 异步处理它们Futures
:例如future { /* do the processing */ }
.
现在假设生产者每秒产生 100 条消息。但消费者每秒只处理 10 条消息。会发生什么 ?我想会有内存泄漏。会有很多Future
对象,线程池的内部消息队列也会增长。是否有意义 ?
处理它的最佳方法是什么?
在 akka 中,使用了执行上下文,但似乎没有邮箱 - 值得阅读源代码,但我可以通过实验回答您的问题:
Future 没有“邮箱”,我不能 100% 确定 Akka 在幕后做了什么或执行上下文实际包含什么,但我们可以看到直接使用 future 时 akka 会耗尽内存:
scala> import scala.concurrent.Future
import scala.concurrent.Future
scala> import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext.Implicits.global ^
scala> while(1==1) Future(Thread.sleep(100))
java.lang.OutOfMemoryError: Java heap space
如果我们在谈论消息,那么有一个邮箱描述了参与者消息队列的行为(由于一次只处理一条消息,它将被填满)——我将在下面解释这一点。
假设一个有界邮箱(例如一个有大小限制的邮箱),消息会发生什么。答案是取决于邮箱。首先,有界邮箱有一些设置,例如大小限制:
bounded-mailbox {
mailbox-type = "akka.dispatch.BoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 10s
}
现在,当达到该限制时,akka 将根据邮箱的配置方式丢弃旧邮件或新邮件 - 例如使用此设置
# whether to drop older items (instead of newer) when the queue is full
discard-old-when-full = on
显然,如果存在其他资源问题,例如内存不足,那么您的应用程序可能会崩溃,这意味着消息将在存储在内存中时丢失。无界邮箱将继续堆叠邮件,直到出现错误情况,这就是您可能要使用有界邮箱的原因。
如果在不希望出现的错误情况下丢失消息,还有另一个选项 = 可以使用持久邮箱,它将消息存储在更持久的位置,例如文件中。下面是一个邮箱配置示例,它使用文件来进行更持久的邮件存储。
akka {
actor {
mailbox {
file-based {
# directory below which this queue resides
directory-path = "./_mb"
# attempting to add an item after the queue reaches this size (in items)
# will fail.
max-items = 2147483647
# attempting to add an item after the queue reaches this size (in bytes)
# will fail.
max-size = 2147483647 bytes
# attempting to add an item larger than this size (in bytes) will fail.
max-item-size = 2147483647 bytes
# maximum expiration time for this queue (seconds).
max-age = 0s
# maximum journal size before the journal should be rotated.
max-journal-size = 16 MiB
# maximum size of a queue before it drops into read-behind mode.
max-memory-size = 128 MiB
# maximum overflow (multiplier) of a journal file before we re-create it.
max-journal-overflow = 10
# absolute maximum size of a journal file until we rebuild it,
# no matter what.
max-journal-size-absolute = 9223372036854775807 bytes
# whether to drop older items (instead of newer) when the queue is full
discard-old-when-full = on
# whether to keep a journal file at all
keep-journal = on
# whether to sync the journal after each transaction
sync-journal = off
# circuit breaker configuration
circuit-breaker {
# maximum number of failures before opening breaker
max-failures = 3
# duration of time beyond which a call is assumed to be timed out and
# considered a failure
call-timeout = 3 seconds
# duration of time to wait until attempting to reset the breaker during
# which all calls fail-fast
reset-timeout = 30 seconds
}
}
}
}
}
您可以设置最大队列大小。事实上,我认为默认情况下 Akka 演员的队列是有限的,尽管我在这里很可能是错的。
这并不能真正解决问题,但最终,如果您没有足够的后端参与者来进行处理,您将无法处理所有内容。
我喜欢 Netflix 的做法:所有请求都通过监控后端运行状况的代理。如果后端花费的时间太长,他们会放弃请求并提供回退:要么是合理的默认值,要么是错误消息。他们谈论了很多关于他们的架构,例如看到这个演示文稿。
拥有多个消费者 - 使用演员池。您可以根据池中的压力动态调整其大小。见http://doc.akka.io/docs/akka/snapshot/scala/routing.html