我有来自 SparkStreaming 的数据流。我需要处理并最终想将数据存储在 Cassandra 中。所以,早些时候我试图使用 SparkCassandra 连接器。但它没有提供对工作人员的 SparkStreaming Context 对象的访问权限。所以,我必须使用单独的 cassandra-scala 驱动程序。因此,我最终得到了phantom。现在,我的问题是我已经在 cassnandra 中定义了列族。那么,我如何从 scala 中进行选择和更新查询。
我已经关注了这些文档链接1 ,但我不明白为什么我们需要在客户端(scala 代码)端给出表定义。为什么我们不能只是给予Keyspace
,ClusterPoints
并ColumnFamily
完成它。
object CustomConnector {
val hosts = Seq("IP1", "IP2")
val Connector = ContactPoints(hosts).keySpace("KEYSPACE_NAME")
}
realTimeAgg.foreachRDD{ x => if (x.toLocalIterator.nonEmpty) {
x.foreachPartition {
How to achieve select/insert in Cassandra table here using phantom
}