考虑 AlpakkaSlick.source
方法的源代码:
/**
* Scala API: creates a Source[T, NotUsed] that performs the
* specified query against the (implicitly) specified
* Slick database and streams the results.
* This works for both "typed" Slick queries
* and "plain SQL" queries.
*
* @param streamingQuery The Slick query to execute, which can
* be either a "typed" query or a "plain SQL"
* query produced by one of the Slick "sql..."
* String interpolators
* @param session The database session to use.
*/
def source[T](
streamingQuery: StreamingDBIO[Seq[T], T]
)(implicit session: SlickSession): Source[T, NotUsed] =
Source.fromPublisher(session.db.stream(streamingQuery))
session.db.stream(streamingQuery))
上面的结果是 a ,它是传递给 Akka Stream 的DatabasePublisher
Reactive Streams 。不要担心尝试为数据子集创建多个流;您可以安全地使用检索表中所有行的查询并将结果作为单个流处理。Publisher
Source.fromPublisher
Source
需要注意的一件事是,您可能需要配置一些在 Alpakka 文档中没有提到的设置,但在Slick 文档中:
注意:某些数据库系统可能需要以某种方式设置会话参数以支持流式传输,而无需在客户端的内存中一次缓存所有数据。例如,PostgreSQL 需要.withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n)
(具有所需的页面大小n
)和.transactionally
正确的流式传输。
因此,例如,如果您使用的是 PostgreSQL,那么您Source
可能如下所示:
val source =
Slick.source(
TableQuery[Items]
.result
.withStatementParameters(
rsType = ResultSetType.ForwardOnly,
rsConcurrency = ResultSetConcurrency.ReadOnly,
fetchSize = 10
)
.transactionally)
TableQuery[Items].result
返回与 关联的表中的所有行Items
。
尽管有文档,我已经成功地将 SlickDatabasePublisher
与 Akka Streams 结合使用来从 PostgreSQL 中的表中检索和更新数百万行,而无需设置withStatementParameters
或transactionally
. 在没有这些设置的情况下尝试:
val source = Slick.source(TableQuery[Items].result)