我正在尝试使用谷歌云服务实现一个实时推荐系统。我已经使用 Kafka、Apache Storm 和 Cassandra 构建了引擎,但我想使用 Cloud Pub/Sub、Cloud Dataflow 和 Cloud Bigtable 在 Scala 中创建相同的引擎。
到目前为止,在 Cassandra 中,由于我在 Apache Storm bolt 操作期间多次读写,我已经实现了以下连接器MyDatabase.scala,它启动与数据库的单例连接,并在 bolt 内使用此连接来读取和更新用户使用来自 Kafka spout 的流数据的表。我为 Cassandra 使用了 Phantom Scala API 驱动程序。
MyDatabase.scala
import scala.concurrent.Await
import scala.concurrent.duration._
import com.websudos.phantom.dsl._
object CustomConnector {
val hosts = Seq("localhost")
val connector = ContactPoints(hosts).keySpace(""my_keyspace")
}
class MyDatabase(val keyspace: KeySpaceDef) extends Database(keyspace) {
object users extends Users with keyspace.Connector
}
object MyDatabase extends MyDatabase(CustomConnector.connector) {
Await.result(MyDatabase.autocreate.future(), 5.seconds)
}
用户.scala
import com.websudos.phantom.CassandraTable
import com.websudos.phantom.dsl._
import scala.concurrent.Future
case class User(
id: String,
items: Map[String, Int]
)
class UsersTable extends CassandraTable[Users, User] {
object id extends StringColumn(this) with PartitionKey[String]
object items extends MapColumn[String, Int](this)
def fromRow(row: Row): User = {
User(
id(row),
items(row)
)
}
}
abstract class Users extends UsersTable with RootConnector {
def store(user: User): Future[ResultSet] = {
insert.value(_.id, user.id).value(_.items, user.items)
.consistencyLevel_=(ConsistencyLevel.ALL)
.future()
}
def getById(id: String): Future[Option[User]] = {
select.where(_.id eqs id).one()
}
}
数据流管道将如下所示:
- 从 Pub/Sub 提取流数据。
- 在单个 parDo 中实现逻辑,我们将使用从 Pub/Sub 提取的数据生成的一些新值更新 Bigtable 中的多个表。
当您使用 Phantom DSL 时,创建与 Cassandra 的连接非常简单。我的问题是,是否有任何等价的库,例如 Google Cloud Bigtable 的 Phantom,或者使用 Google Cloud API 和 Scio 实现此功能的正确方法是什么(因为我将使用 Scala 实现 Dataflow 管道)。似乎我找不到任何相关示例来建立与 Bigtable 的连接并在 Scala 的 Dataflow 管道中使用此连接。
谢谢