1

我正在尝试使用谷歌云服务实现一个实时推荐系统。我已经使用 Kafka、Apache Storm 和 Cassandra 构建了引擎,但我想使用 Cloud Pub/Sub、Cloud Dataflow 和 Cloud Bigtable 在 Scala 中创建相同的引擎。

到目前为止,在 Cassandra 中,由于我在 Apache Storm bolt 操作期间多次读写,我已经实现了以下连接器MyDatabase.scala,它启动与数据库的单例连接,并在 bolt 内使用此连接来读取和更新用户使用来自 Kafka spout 的流数据的表。我为 Cassandra 使用了 Phantom Scala API 驱动程序。

MyDatabase.scala

import scala.concurrent.Await
import scala.concurrent.duration._
import com.websudos.phantom.dsl._


object CustomConnector {

  val hosts = Seq("localhost")

  val connector = ContactPoints(hosts).keySpace(""my_keyspace")

}

class MyDatabase(val keyspace: KeySpaceDef) extends Database(keyspace) {
  object users extends Users with keyspace.Connector
}

object MyDatabase extends MyDatabase(CustomConnector.connector) {
  Await.result(MyDatabase.autocreate.future(), 5.seconds)
}

用户.scala

import com.websudos.phantom.CassandraTable
import com.websudos.phantom.dsl._

import scala.concurrent.Future

case class User(
                 id: String,
                 items: Map[String, Int]
               )

class UsersTable extends CassandraTable[Users, User] {

  object id extends StringColumn(this) with PartitionKey[String]
  object items extends MapColumn[String, Int](this)

  def fromRow(row: Row): User = {
    User(
      id(row),
      items(row)
    )
  }
}

abstract class Users extends UsersTable with RootConnector {

  def store(user: User): Future[ResultSet] = {
    insert.value(_.id, user.id).value(_.items, user.items)
      .consistencyLevel_=(ConsistencyLevel.ALL)
      .future()
  }

  def getById(id: String): Future[Option[User]] = {
    select.where(_.id eqs id).one()
  }
}

数据流管道将如下所示:

  1. 从 Pub/Sub 提取流数据。
  2. 在单个 parDo 中实现逻辑,我们将使用从 Pub/Sub 提取的数据生成的一些新值更新 Bigtable 中的多个表。

当您使用 Phantom DSL 时,创建与 Cassandra 的连接非常简单。我的问题是,是否有任何等价的库,例如 Google Cloud Bigtable 的 Phantom,或者使用 Google Cloud API 和 Scio 实现此功能的正确方法是什么(因为我将使用 Scala 实现 Dataflow 管道)。似乎我找不到任何相关示例来建立与 Bigtable 的连接并在 Scala 的 Dataflow 管道中使用此连接。

谢谢

4

1 回答 1

1

在处理 a 中的多个元素之间共享与数据库的连接的 Beam 方法DoFn是使用@Setupand@Teardown方法。有关示例,请参阅Beam Cassandra 连接器的源代码。

于 2018-02-05T19:02:23.503 回答