可以通过 spark-cassandra-connector更新具有计数器列的表。您将不得不使用 DataFrames 和DataFrameWriter方法 save 模式“append”(或SaveMode .Append 如果您愿意)。检查代码DataFrameWriter.scala。
例如,给定一个表:
cqlsh:test> SELECT * FROM name_counter ;
name | surname | count
---------+---------+-------
John | Smith | 100
Zhang | Wei | 1000
Angelos | Papas | 10
代码应如下所示:
val updateRdd = sc.parallelize(Seq(Row("John", "Smith", 1L),
Row("Zhang", "Wei", 2L),
Row("Angelos", "Papas", 3L)))
val tblStruct = new StructType(
Array(StructField("name", StringType, nullable = false),
StructField("surname", StringType, nullable = false),
StructField("count", LongType, nullable = false)))
val updateDf = sqlContext.createDataFrame(updateRdd, tblStruct)
updateDf.write.format("org.apache.spark.sql.cassandra")
.options(Map("keyspace" -> "test", "table" -> "name_counter"))
.mode("append")
.save()
更新后:
name | surname | count
---------+---------+-------
John | Smith | 101
Zhang | Wei | 1002
Angelos | Papas | 13
通过将 RDD 隐式转换为DataFrameimport sqlContext.implicits._
并使用.DataFrame 转换可以更简单.toDF()
。
检查此玩具应用程序的完整代码:
https ://github.com/kyrsideris/SparkUpdateCassandra/tree/master
由于版本在这里非常重要,以上适用于 Scala 2.11.7、Spark 1.5.1、spark-cassandra-connector 1.5.0-RC1-s_2.11、Cassandra 3.0.5。DataFrameWriter 被指定为@Experimental
since @since 1.4.0
。