我以以下形式从kafka接收数据
{"email":"test@example","firstname":"Example","lastname":"User"}
我想访问电子邮件 ID 和名字,并希望将其与来自 cassandra 的数据以以下形式进行比较:
CassandraRow{email: abc@xyz.com}
我以以下形式从kafka接收数据
{"email":"test@example","firstname":"Example","lastname":"User"}
我想访问电子邮件 ID 和名字,并希望将其与来自 cassandra 的数据以以下形式进行比较:
CassandraRow{email: abc@xyz.com}
您需要使用joinWithCassandraTable
功能与 Cassandra 执行连接...
为了更有效,您可能需要对从 Kafka 获得的 RDD 重新分区,以匹配 Cassandra 表中的分区。代码可能如下所示:
val resultRdd = kafkaRDD.repartitionByCassandraReplica("ks","emails")
.joinWithCassandraTable("ks","emails")
之后,您可以分析名称是否匹配等。加入后,您应该只获得 Cassandra 中有电子邮件的记录......