0

我正在使用带有 spark cassandra 连接器 1.2.3 的 spark 1.2,我正在尝试更新一些表行:

例子:

CREATE TABLE myTable ( 
a text, 
b text, 
c text, 
date timestamp, 
d text, 
e text static, 
f text static, 
PRIMARY KEY ((a, b, c), date, d) 
) WITH CLUSTERING ORDER BY (date ASC, d ASC)

val interactions = sc.cassandraTable[(String, String, String, DateTime, String, String)]("keySpace", "myTable"). 
select("a","b","c","date", "d", "e","f") 
val empty = interactions.filter(r => r._6 == null).cache() 
empty.count()

我只计算包含“e”为空的行数,并将它们替换为“b”的值

 val update_inter = empty.map( r =>  (r._1,r._2, r._3, r._4, r._5, r._2)) 
 update_inter.saveToCassandra("keySpace", "myTable", SomeColumns("a","b","c","date", "d", "e", "f"))

这在我签入 cqlsh 时有效,但是当我通过 spark cassandra 请求相同的行时,我仍然得到 null 值。

这是 spark cassandra 连接器中的错误吗?谢谢你的帮助。

4

2 回答 2

0

随着插入/更新的发生,Cassandra 不会在原地覆盖行,而是在另一个 SSTable 中写入插入或更新数据的新时间戳版本。

您的 Spark 作业不是更新现有行而是写入新行,或者您的 SSTables 尚未将更改写入磁盘。如果要将结果写入新表,则 null 'e' 列的计数为零。

尝试使用 nodetool 刷新命令并阅读以下内容:Cassandra Compaction

于 2015-09-28T16:20:02.843 回答
0

.mode('append') 用于附加我猜。我面临类似的问题,但使用 java 连接器,但似乎在 python 中此选项可用

于 2017-07-26T10:23:47.277 回答