14

我正在尝试弄清楚如何使用流畅的流媒体。我使用 slick 3.0.0 和 postgres 驱动程序

情况如下:服务器必须将客户端数据序列拆分为受大小(以字节为单位)限制的块。所以,我写了以下精巧的查询:

val sequences = TableQuery[Sequences]
def find(userId: Long, timestamp: Long) = sequences.filter(s ⇒ s.userId === userId && s.timestamp > timestamp).sortBy(_.timestamp.asc).result
val seq = db.stream(find(0L, 0L))

我将 seq 与 akka-streams 结合起来Source,编写了 custom PushPullStage,它限制了数据的大小(以字节为单位)并在达到大小限制时完成上游。它工作得很好。问题是 - 当我查看 postgres 日志时,我看到这样的查询 select * from sequences where user_id = 0 and timestamp > 0 order by timestamp;

因此,乍一看,似乎有很多(并且不必要的)数据库查询正在进行,只是在每个查询中使用了几个字节。使用 Slick 进行流式传输以最小化数据库查询并充分利用每个查询中传输的数据的正确方法是什么?

4

2 回答 2

18

使用 Slick 和 Postgres 进行流式传输的“正确方法”包括三件事:

  1. 必须使用 db.stream()

  2. autoCommit必须在 JDBC 驱动程序中禁用。一种方法是通过后缀使查询在事务中运行.transactionally

  3. 必须设置fetchSize为 0 以外的值,否则 postgres 会将整个结果集一次性推送到客户端。

前任:

DB.stream(
  find(0L, 0L)
    .transactionally
    .withStatementParameters(fetchSize = 1000)
).foreach(println)

有用的链接:

https://github.com/slick/slick/issues/1038

https://github.com/slick/slick/issues/809

于 2015-07-12T18:58:11.327 回答
0

在 Slick 中流式传输的正确方法是文档中提供的

val q = for (c <- coffees) yield c.image
val a = q.result
val p1: DatabasePublisher[Blob] = db.stream(a.withStatementParameters(
  rsType = ResultSetType.ForwardOnly, 
  rsConcurrency = ResultSetConcurrency.ReadOnly, 
  fetchSize = 1000 /*your fetching size*/
).transactionally)
于 2020-01-03T14:34:51.947 回答