5

我在更新键空间中的表时遇到了 scala 上的 spark cassandra 连接器的问题

这是我的一段代码

val query = "UPDATE " + COLUMN_FAMILY_UNIQUE_TRAFFIC + DATA_SET_DEVICE +
                        " SET a= a + " + b + " WHERE x=" +
                        x + " AND y=" + y +
                        " AND z=" + x

println(query)

val KeySpace    = new CassandraSQLContext(sparkContext)
KeySpace.setKeyspace(KEYSPACE)

hourUniqueKeySpace.sql(query)

当我执行此代码时,我收到这样的错误

Exception in thread "main" java.lang.RuntimeException: [1.1] failure: ``insert'' expected but identifier UPDATE found

知道为什么会这样吗?我怎样才能解决这个问题?

4

2 回答 2

6

可以通过 spark-cassandra-connector更新具有计数器列的表。您将不得不使用 DataFrames 和DataFrameWriter方法 save 模式“append”(或SaveMode .Append 如果您愿意)。检查代码DataFrameWriter.scala

例如,给定一个表:

cqlsh:test> SELECT * FROM name_counter ;

 name    | surname | count
---------+---------+-------
    John |   Smith |   100
   Zhang |     Wei |  1000
 Angelos |   Papas |    10

代码应如下所示:

val updateRdd = sc.parallelize(Seq(Row("John",    "Smith", 1L),
                                   Row("Zhang",   "Wei",   2L),
                                   Row("Angelos", "Papas", 3L)))

val tblStruct = new StructType(
    Array(StructField("name",    StringType, nullable = false),
          StructField("surname", StringType, nullable = false),
          StructField("count",   LongType,   nullable = false)))

val updateDf  = sqlContext.createDataFrame(updateRdd, tblStruct)

updateDf.write.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "test", "table" -> "name_counter"))
.mode("append")
.save()

更新后:

 name    | surname | count
---------+---------+-------
    John |   Smith |   101
   Zhang |     Wei |  1002
 Angelos |   Papas |    13

通过将 RDD 隐式转换为DataFrameimport sqlContext.implicits._并使用.DataFrame 转换可以更简单.toDF()

检查此玩具应用程序的完整代码: https ://github.com/kyrsideris/SparkUpdateCassandra/tree/master

由于版本在这里非常重要,以上适用于 Scala 2.11.7、Spark 1.5.1、spark-cassandra-connector 1.5.0-RC1-s_2.11、Cassandra 3.0.5。DataFrameWriter 被指定为@Experimentalsince @since 1.4.0

于 2016-04-21T11:29:11.217 回答
3

我相信您无法通过 SPARK 连接器进行本地更新。请参阅文档

“Spark Cassandra 连接器的默认行为是在插入 cassandra 表时覆盖集合。要覆盖此行为,您可以指定自定义映射器,其中包含有关如何处理集合的说明。”

因此,您实际上需要使用现有键插入一条新记录。

于 2015-08-06T01:40:08.653 回答