0

我使用 slick-3.0.0 并尝试使用streaming

假设有AccountsTablePreferencesTable

我想从中获取一些信息PreferencesTable并将其用于流中AccountsTable。例如(参见 TODO):

val somePrefQuery: Query[Rep[String], ...] = PreferencesTable.filter(...)
val somePrefAction = somePrefQuery.result
val somePrefStream = db.stream(somePrefAction)

val accountsStream: DatabasePublisher[String] = 
                              db.stream(AccountsTable.map(_.id).result)

accountsStream.mapResult { accountId: String =>
   //TODO how to get somePref value from 
   //     somePrefQuery or somePrefAction or somePrefStream
   // Is there best approach for such task?
   val somePref: String = ???

   val result:(String, String) = (accountId, somePref)
   result
}
4

1 回答 1

1

Akka 提供并发流功能,允许您以您描述的方式将流组合在一起。

您首先创建您的accountId价值观的来源:

import akka.stream.scaladsl.Source

val accountIdSrc : Source[String, _] = 
  Source fromPublisher (db stream (AccountsTable.map(_.id).result))

然后Source可以将其附加到 pref 查询逻辑:

def queryForPrefs(accountId : String) =
  PreferenceTable
    .filter(_.accountId === accountId)
    .map(_.pref)
    .result

case class PrefData(accountId : String, somePref : String)

val accountAndPrefSrc : Source[PrefData, _] = 
  accountIdSrc flatMapConcat { accountId =>
    Source
      .fromPublisher(db stream queryForPrefs(accountId))
      .map(pref => PrefData(accountId, pref))
  }
于 2017-06-12T16:25:12.517 回答