4

我有一个使用 Akka 的系统,该系统目前通过消息队列处理传入的流数据。当一个记录到达然后它被处理时,mq 被确认并且记录被传递以在系统内进一步处理。

现在我想添加对使用 DB 作为输入的支持。
输入源能够处理 DB 的方法是什么(应该以接收器可以处理的速度输入 > 100M 记录 - 所以我假设反应/akka-streams?)?

4

2 回答 2

11

光滑的图书馆

流畅的流媒体通常是这样做的。

稍微扩展一下 slick 文档以包含 akka 流:

//SELECT Name from Coffees
val q = for (c <- coffees) yield c.name

val action = q.result

type Name = String

val databasePublisher : DatabasePublisher[Name] = db stream action

import akka.stream.scaladsl.Source

val akkaSourceFromSlick : Source[Name, _] = Source fromPublisher databasePublisher

现在akkaSourceFromSlick就像任何其他 akka 流一样Source

“老派”结果集

也可以使用普通的ResultSet,没有光滑的,作为 akka 流的“引擎”。我们将利用流Source可以从Iterator.

首先使用标准 jdbc 技术创建 ResultSet:

import java.sql._

val resultSetGenerator : () => Try[ResultSet] = Try {
  val statement : Statement = ???
  statement executeQuery "SELECT Name from Coffees"
}

当然,所有 ResultSet 实例都必须将光标移动到第一行之前:

val adjustResultSetBeforeFirst : (ResultSet) => Try[ResultSet] = 
  (resultSet) => Try(resultSet.beforeFirst()) map (_ => resultSet)

一旦我们开始遍历行,我们就必须从正确的列中提取值:

val getNameFromResultSet : ResultSet => Name = _ getString "Name"

现在我们可以实现接口以从 ResultSetIterator创建一个:Iterator[Name]

val convertResultSetToNameIterator : ResultSet => Iterator[Name] = 
  (resultSet) => new Iterator[Try[Name]] {
    override def hasNext : Boolean  = resultSet.next
    override def next() : Try[Name] = Try(getNameFromResultSet(resultSet))
   } flatMap (_.toOption)

最后,将所有部分粘合在一起以创建我们需要传递给的函数Source.fromIterator

val resultSetGenToNameIterator : (() => Try[ResultSet]) => () => Iterator[Name] = 
  (_ : () => Try[ResultSet])
    .andThen(_ flatMap adjustResultSetBeforeFirst) 
    .andThen(_ map convertResultSetToNameIterator) 
    .andThen(_ getOrElse Iterator.empty)

这个迭代器现在可以提供一个 Source:

val akkaSourceFromResultSet : Source[Name, _] = 
  Source fromIterator resultSetGenToNameIterator(resultSetGenerator)

这个实现一直到数据库都是反应式的。由于 ResultSet 一次预取有限数量的行,数据只会在流Sink信号需要时通过数据库从硬盘驱动器中取出。

于 2017-07-10T15:11:42.020 回答
1

我发现 Alpakka 文档非常好,并且比 Java Publisher 接口更容易使用反应流。

Alpakka 项目是一个开源计划,旨在为 Java 和 Scala 实现流感知、反应式集成管道。它建立在 Akka Streams 之上,从一开始就设计用于理解原生流,并为反应式和面向流的编程提供 DSL,并内置支持背压

带有 Slick 的 Alpakka 文档:https ://doc.akka.io/docs/alpakka/current/slick.html

Alpakka Github:https ://github.com/akka/alpakka

于 2020-05-24T11:23:06.273 回答