我有以下代码:-
collection.foreachRDD(rdd =>
{
if (!rdd.partitions.isEmpty) {
println("RDD collected")
try {
val dfs = rdd.toDF()
dfs.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "table", "keyspace" -> "db", "cluster" -> "Test Cluster"))
.mode(SaveMode.Append).save()
} catch {
case e: Exception => e.printStackTrace
}
println("Written to cassandra")
} else {
println("blank rdd")
}
})
现在,我正在寻找一种解决方案,我可以使用线程或其他方式并行写入 Cassandra,因为我的写入速度非常慢,大约每秒 2000-2200 次。谢谢,