3

如何使用 cassandra spark 连接器编写可选案例类?

例子 :

case class User(name : String, address : Option[Address])
case class Address(street : String, city : String)

当我尝试将用户保存到 cassandra 时,rdd.saveToCassandra它会引发错误

Failed to get converter for field "address" of type scala.Option[Address] in User mapped to column "address" of "testspark.logs_raw"

我试图实现一个TypeConverter但没有奏效。

但是,嵌套案例类已正确转换为 cassandra UDT,并且接受可选字段。

在不改变数据模型的情况下有什么好的方法来处理这个问题吗?

4

1 回答 1

1

只是为了能见度。在现代版本中一切正常 - SCC 1.4.0-1.6.0 周围的 UDT 有很多变化,SCC 2.0.8 中还有许多性能优化。使用 SCC 2.5.1,RDD API 可以正确映射所有内容 - 例如,如果我们有以下 UDT 和表:

cqlsh> create type test.address (street text, city text);
cqlsh> create table test.user(name text primary key, address test.address);
cqlsh> insert into test.user(name, address) values 
   ('with address', {street: 'street 1', city: 'city1'});
cqlsh> insert into test.user(name) values ('without address');
cqlsh> select * from test.user;

 name            | address
-----------------+-------------------------------------
    with address | {street: 'street 1', city: 'city1'}
 without address |                                null

(2 rows)

然后 RDD API 能够在读取数据时正确拉取所有内容:

scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._

scala> case class Address(street : String, city : String)
defined class Address

scala> case class User(name : String, address : Option[Address])
defined class User

scala> val data = sc.cassandraTable[User]("test", "user")
data: com.datastax.spark.connector.rdd.CassandraTableScanRDD[User] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:18

scala> data.collect
res0: Array[User] = Array(User(without address,None), 
   User(with address,Some(Address(street 1,city1))))
于 2020-07-29T07:04:50.527 回答