-1

我以以下形式从kafka接收数据

{"email":"test@example","firstname":"Example","lastname":"User"}

我想访问电子邮件 ID 和名字,并希望将其与来自 cassandra 的数据以以下形式进行比较:

CassandraRow{email: abc@xyz.com}
4

1 回答 1

0

您需要使用joinWithCassandraTable功能与 Cassandra 执行连接...

为了更有效,您可能需要对从 Kafka 获得的 RDD 重新分区,以匹配 Cassandra 表中的分区。代码可能如下所示:

val resultRdd = kafkaRDD.repartitionByCassandraReplica("ks","emails")
   .joinWithCassandraTable("ks","emails")

之后,您可以分析名称是否匹配等。加入后,您应该只获得 Cassandra 中有电子邮件的记录......

于 2018-07-07T13:14:16.510 回答