我需要将 Cassandra 分区编写为镶木地板文件。由于我无法在 foreach 函数中共享和使用 sparkSession。首先,我调用 collect 方法收集驱动程序中的所有数据,然后将 parquet 文件写入 HDFS,如下所示。
感谢这个链接https://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.md
我能够得到我的分区行。每当从 cassandra 表中读取分区时,我想将分区行写入单独的镶木地板文件。我还尝试了 sparkSQLContext 该方法将任务结果写为临时的。我想,所有的任务都完成了。我会看到镶木地板文件。
有什么方便的方法吗?
val keyedTable : CassandraTableScanRDD[(Tuple2[Int, Date], MyCassandraTable)] = getTableAsKeyed()
keyedTable.groupByKey
.collect
.foreach(f => {
import sparkSession.implicits._
val items = f._2.toList
val key = f._1
val baseHDFS = "hdfs://mycluster/parquet_test/"
val ds = sparkSession.sqlContext.createDataset(items)
ds.write
.option("compression", "gzip")
.parquet(baseHDFS + key._1 + "/" + key._2)
})