1

我在 PostgreSQL 中有 100 万行和 100 多列的数据源,我想使用 Spark SQL,所以我想将此数据源转换为SchemaRDD.

Spark SQL编程指南中介绍了两种方法,一种是通过反射,也就是说我需要定义:

case class Row(Var1: Int, Var2: String, ...)

这很乏味,因为我有 100 多列。

另一种方法是“以编程方式指定架构”,这意味着我需要定义:

val schema =
  StructType(
    Seq(StructField("Var1", IntegerType), StructField("Var2", StringType), ...))

这对我来说也很乏味。

实际上,还有另一个问题,因为我PostgreSQL使用类加载我的数据库,JdbcRDD但我发现我还需要在构造函数的mapRow参数中定义模式JdbcRDD,如下所示:

def extractValues(r: ResultSet) = {
  (r.getInt("Var1"), r.getString("Var2"), ...)
}
val dbRDD = new JdbcRDD(sc, createConnection,
  "SELECT * FROM PostgreSQL OFFSET ? LIMIT ?",
  0, 1000000, 1, extractValues)

这个 API 仍然要求我自己创建模式,更糟糕的是我需要重做类似的事情来将其转换JdbcRDDSchemaRDD,那将是非常笨拙的代码。

所以我想知道这项任务的最佳方法是什么?

4

1 回答 1

2

您只需要支持有限数量的数据类型。为什么不使用

java.sql.ResultSetMetaData

例如

val rs = jdbcStatement.executeQuery("select * from myTable limit 1")
val rmeta = rs.getMetaData

读取一行,然后为每一列动态生成所需的 StructField。

你需要一个case语句来处理

val myStructFields = for (cx <- 0 until rmeta.getColumnCount) {
       val jdbcType = rmeta.getColumnType(cx)
       } yield StructField(rmeta.getColumnName(cx),jdbcToSparkType(jdbcType))

val mySchema = StructType(myStructFields.toSeq)

jdbcToSparkType 如下所示:

  def jdbcToSparkType(jdbcType: Int) = {
    jdbcType match {
       case 4 => InteegerType  
       case 6 => FloatType
        ..
   }  

更新 要生成 RDD[Row] :您将遵循类似的模式。在这种情况下,你会

val rows = for (rs.next) {
    row = jdbcToSpark(rs)
    } yield row

val rowRDD = sc.parallelize(rows)

在哪里

def jdbcToSpark(rs: ResultSet) = {
   var rowSeq = Seq[Any]()
   for (cx <- 0 to rs.getMetaData.getColumnCount) {
     rs.getColumnType(cx) match {
         case 4 => rowSeq :+ rs.getInt(cx)
          ..
     }
   }
   Row.fromSeq(rowSeq)
}

然后 val 行

于 2014-12-28T17:00:10.713 回答