0

我有一个非常大的关系数据库数据集,我想在弹性搜索中编制索引。检索数据的查询由多个连接和所有其他 SQL 好东西组成。数据被分组/处理(在内存中)以创建有意义的 json 表示,并从结果中创建批量更新并使用elastic4sscala 客户端发送到弹性搜索。

我想在这个过程中引入流式传输slickelastic支持它。

我遇到的问题是,仅当所有结果(对于给定的关系)都加载到内存中时,内存中的分组和转换为 json 才有意义(由于多个连接/左连接,我需要按 id 分组并映射结果在记忆中)。流媒体将如何处理?

4

1 回答 1

1

我已经使用像下面的代码一样的流畅的反应流解决了多个连接的流式传输。这将使用 Publisher 创建流,Elastic 需要订阅此发布者才能开始检索流数据。

def listTransactions(): DatabasePublisher[Transaction] = {
  val join = for {
    (t, a) <- transactions joinLeft (account) on (_.accountid === _.id)
  } yield (t, a)
 db.stream(join.result.withStatementParameters(rsType = ResultSetType.ForwardOnly,rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = 1000).transactionally).mapResult {
  result => result._1
  }
}

注意:-代码中的_1代表事务表。

于 2020-01-04T17:12:12.673 回答