1

假设我有一个“生产者-消费者”问题:生产者向消费者发送消息,消费者使用 Scala 异步处理它们Futures:例如future { /* do the processing */ }.

现在假设生产者每秒产生 100 条消息。但消费者每秒只处理 10 条消息。会发生什么 ?我想会有内存泄漏。会有很多Future对象,线程池的内部消息队列也会增长。是否有意义 ?

处理它的最佳方法是什么?

4

3 回答 3

2

在 akka 中,使用了执行上下文,但似乎没有邮箱 - 值得阅读源代码,但我可以通过实验回答您的问题:

Future 没有“邮箱”,我不能 100% 确定 Akka 在幕后做了什么或执行上下文实际包含什么,但我们可以看到直接使用 future 时 akka 会耗尽内存:

scala> import scala.concurrent.Future
import scala.concurrent.Future

scala> import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext.Implicits.global                  ^

scala> while(1==1) Future(Thread.sleep(100))
java.lang.OutOfMemoryError: Java heap space

如果我们在谈论消息,那么有一个邮箱描述了参与者消息队列的行为(由于一次只处理一条消息,它将被填满)——我将在下面解释这一点。

假设一个有界邮箱(例如一个有大小限制的邮箱),消息会发生什么。答案是取决于邮箱。首先,有界邮箱有一些设置,例如大小限制:

bounded-mailbox {
  mailbox-type = "akka.dispatch.BoundedMailbox"
  mailbox-capacity = 1000
  mailbox-push-timeout-time = 10s
}

现在,当达到该限制时,akka 将根据邮箱的配置方式丢弃旧邮件或新邮件 - 例如使用此设置

# whether to drop older items (instead of newer) when the queue is full
discard-old-when-full = on

显然,如果存在其他资源问题,例如内存不足,那么您的应用程序可能会崩溃,这意味着消息将在存储在内存中时丢失。无界邮箱将继续堆叠邮件,直到出现错误情况,这就是您可能要使用有界邮箱的原因。

如果在不希望出现的错误情况下丢失消息,还有另一个选项 = 可以使用持久邮箱,它将消息存储在更持久的位置,例如文件中。下面是一个邮箱配置示例,它使用文件来进行更持久的邮件存储。

akka {
  actor {
    mailbox {
      file-based {
        # directory below which this queue resides
        directory-path = "./_mb"

        # attempting to add an item after the queue reaches this size (in items)
        # will fail.
        max-items = 2147483647

        # attempting to add an item after the queue reaches this size (in bytes)
        # will fail.
        max-size = 2147483647 bytes

        # attempting to add an item larger than this size (in bytes) will fail.
        max-item-size = 2147483647 bytes

        # maximum expiration time for this queue (seconds).
        max-age = 0s

        # maximum journal size before the journal should be rotated.
        max-journal-size = 16 MiB

        # maximum size of a queue before it drops into read-behind mode.
        max-memory-size = 128 MiB

        # maximum overflow (multiplier) of a journal file before we re-create it.
        max-journal-overflow = 10

        # absolute maximum size of a journal file until we rebuild it,
        # no matter what.
        max-journal-size-absolute = 9223372036854775807 bytes

        # whether to drop older items (instead of newer) when the queue is full
        discard-old-when-full = on

        # whether to keep a journal file at all
        keep-journal = on

        # whether to sync the journal after each transaction
        sync-journal = off

        # circuit breaker configuration
        circuit-breaker {
          # maximum number of failures before opening breaker
          max-failures = 3

          # duration of time beyond which a call is assumed to be timed out and
          # considered a failure
          call-timeout = 3 seconds

          # duration of time to wait until attempting to reset the breaker during
          # which all calls fail-fast
          reset-timeout = 30 seconds
        }
      }
    }
  }
}
于 2013-08-06T13:41:43.347 回答
1

您可以设置最大队列大小。事实上,我认为默认情况下 Akka 演员的队列是有限的,尽管我在这里很可能是错的。

这并不能真正解决问题,但最终,如果您没有足够的后端参与者来进行处理,您将无法处理所有内容。

我喜欢 Netflix 的做法:所有请求都通过监控后端运行状况的代理。如果后端花费的时间太长,他们会放弃请求并提供回退:要么是合理的默认值,要么是错误消息。他们谈论了很多关于他们的架构,例如看到这个演示文稿

于 2013-08-06T12:33:28.523 回答
0

拥有多个消费者 - 使用演员池。您可以根据池中的压力动态调整其大小。见http://doc.akka.io/docs/akka/snapshot/scala/routing.html

于 2013-08-06T12:54:21.993 回答