1

我需要将 Cassandra 分区编写为镶木地板文件。由于我无法在 foreach 函数中共享和使用 sparkSession。首先,我调用 collect 方法收集驱动程序中的所有数据,然后将 parquet 文件写入 HDFS,如下所示。

感谢这个链接https://github.com/datastax/spark-cassandra-connector/blob/master/doc/16_partitioning.md

我能够得到我的分区行。每当从 cassandra 表中读取分区时,我想将分区行写入单独的镶木地板文件。我还尝试了 sparkSQLContext 该方法将任务结果写为临时的。我想,所有的任务都完成了。我会看到镶木地板文件。

有什么方便的方法吗?

val keyedTable : CassandraTableScanRDD[(Tuple2[Int, Date], MyCassandraTable)] = getTableAsKeyed()

keyedTable.groupByKey
      .collect
      .foreach(f => {
        import sparkSession.implicits._
        val items = f._2.toList
        val key = f._1
        val baseHDFS = "hdfs://mycluster/parquet_test/"
        val ds = sparkSession.sqlContext.createDataset(items)

    ds.write
      .option("compression", "gzip")
      .parquet(baseHDFS + key._1 + "/" + key._2)

  })
4

1 回答 1

0

为什么不到处使用 Spark SQL 并使用 Parquet 的内置功能按分区写入数据,而不是自己创建目录层次结构?

像这样的东西:

import org.apache.spark.sql.cassandra._
val data = spark.read.cassandraFormat("table", "keyspace").load()

data.write
      .option("compression", "gzip")
      .partitionBy("col1", "col2")
      .parquet(baseHDFS)

在这种情况下,它将为col&的每个值创建一个单独的目录col2作为嵌套目录,名称如下:${column}=${value}. 然后,当您阅读时,您可能会强制只阅读特定的值。

于 2019-12-26T15:14:18.923 回答