0

我有来自 SparkStreaming 的数据流。我需要处理并最终想将数据存储在 Cassandra 中。所以,早些时候我试图使用 SparkCassandra 连接器。但它没有提供对工作人员的 SparkStreaming Context 对象的访问权限。所以,我必须使用单独的 cassandra-scala 驱动程序。因此,我最终得到了phantom。现在,我的问题是我已经在 cassnandra 中定义了列族。那么,我如何从 scala 中进行选择和更新查询。

我已经关注了这些文档链接1 ,但我不明白为什么我们需要在客户端(scala 代码)端给出表定义。为什么我们不能只是给予KeyspaceClusterPointsColumnFamily完成它。

     object CustomConnector {
       val hosts = Seq("IP1", "IP2")
       val Connector = ContactPoints(hosts).keySpace("KEYSPACE_NAME")
    }

      realTimeAgg.foreachRDD{ x => if (x.toLocalIterator.nonEmpty) {
                                x.foreachPartition {
                         How to achieve select/insert in Cassandra table here using phantom
    }
4

1 回答 1

0

使用 phantom 尚无法实现这一点,我们正在积极努力phantom-spark让您做到这一点,但在现阶段,这还有几个月的时间。

在此期间,您将不得不依赖 spark cassandra 连接器并使用非类型安全 API 来实现此目的。这是一个更不幸的设置,但在不久的将来,这将得到解决。

于 2016-09-15T20:25:33.190 回答