2

我正在使用 Akka Streams 和 Slick 查询一个大型 MySQL 表,但它以OutOfMemoryError. 似乎 Slick 正在将所有结果加载到内存中(如果查询仅限于几行,它不会失败)。为什么会出现这种情况,解决方案是什么?

val dbUrl = "jdbc:mysql://..."

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.slick.scaladsl.SlickSession
import akka.stream.alpakka.slick.scaladsl.Slick
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, Materializer}
import com.typesafe.config.ConfigFactory
import slick.jdbc.GetResult

import scala.concurrent.Await
import scala.concurrent.duration.Duration

val slickDbConfig = s"""
 |profile = "slick.jdbc.MySQLProfile$$"
 |db {
 |  dataSourceClass = "slick.jdbc.DriverDataSource"
 |  properties = {
 |    driver = "com.mysql.jdbc.Driver",
 |    url = "$dbUrl"
 |  }
 |}
 |""".stripMargin

implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer()
implicit val slickSession: SlickSession = SlickSession.forConfig(ConfigFactory.parseString(slickDbConfig))
import slickSession.profile.api._

val responses: Source[String, NotUsed] = Slick.source(
  sql"select my_text from my_table".as(GetResult(r => r.nextString())) // limit 100
)

val future = responses.runForeach((myText: String) =>
  println("my_text: " + myText.length)
)

Await.result(future, Duration.Inf)
4

1 回答 1

3

Slick 文档中:

注意:某些数据库系统可能需要以某种方式设置会话参数以支持流式传输,而无需在客户端的内存中一次缓存所有数据。例如,PostgreSQL 需要.withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n)(具有所需的页面大小n)和.transactionally正确的流式传输。

换句话说,为了防止数据库将所有查询结果加载到内存中,可能需要额外的配置。此配置取决于数据库。MySQL 文档说明如下:

默认情况下,ResultSet 被完全检索并存储在内存中。在大多数情况下,这是最有效的操作方式,并且由于 MySQL 网络协议的设计,更容易实现。如果您正在使用具有大量行或大值的 ResultSet,并且无法在 JVM 中为所需的内存分配堆空间,您可以告诉驱动程序一次将结果流回一行。

Statement要启用此功能,请按以下方式创建实例:

stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY,
              java.sql.ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(Integer.MIN_VALUE);

只进、只读结果集与 fetch 大小的组合Integer.MIN_VALUE用作驱动程序逐行流式传输结果集的信号。

在 Slick 中设置上述配置:

import slick.jdbc._

val query =
  sql"select my_text from my_table".as(GetResult(r => r.nextString()))
    .withStatementParameters(
      rsType = ResultSetType.ForwardOnly,
      rsConcurrency = ResultSetConcurrency.ReadOnly,
      fetchSize = Int.MinValue
    )//.transactionally <-- I'm not sure whether you need ".transactionally"

val responses: Source[String, NotUsed] = Slick.source(query)
于 2018-04-13T17:16:49.700 回答