0

我有以下代码:-

collection.foreachRDD(rdd =>
  {
    if (!rdd.partitions.isEmpty) {
      println("RDD collected")
      try {
        val dfs = rdd.toDF()
 dfs.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "table", "keyspace" -> "db", "cluster" -> "Test Cluster"))
          .mode(SaveMode.Append).save()
      } catch {
        case e: Exception => e.printStackTrace
      }
      println("Written to cassandra")
    } else {
      println("blank rdd")
    }
  })

现在,我正在寻找一种解决方案,我可以使用线程或其他方式并行写入 Cassandra,因为我的写入速度非常慢,大约每秒 2000-2200 次。谢谢,

4

0 回答 0