我在 PostgreSQL 中有 100 万行和 100 多列的数据源,我想使用 Spark SQL,所以我想将此数据源转换为SchemaRDD
.
Spark SQL编程指南中介绍了两种方法,一种是通过反射,也就是说我需要定义:
case class Row(Var1: Int, Var2: String, ...)
这很乏味,因为我有 100 多列。
另一种方法是“以编程方式指定架构”,这意味着我需要定义:
val schema =
StructType(
Seq(StructField("Var1", IntegerType), StructField("Var2", StringType), ...))
这对我来说也很乏味。
实际上,还有另一个问题,因为我PostgreSQL
使用类加载我的数据库,JdbcRDD
但我发现我还需要在构造函数的mapRow
参数中定义模式JdbcRDD
,如下所示:
def extractValues(r: ResultSet) = {
(r.getInt("Var1"), r.getString("Var2"), ...)
}
val dbRDD = new JdbcRDD(sc, createConnection,
"SELECT * FROM PostgreSQL OFFSET ? LIMIT ?",
0, 1000000, 1, extractValues)
这个 API 仍然要求我自己创建模式,更糟糕的是我需要重做类似的事情来将其转换JdbcRDD
为SchemaRDD
,那将是非常笨拙的代码。
所以我想知道这项任务的最佳方法是什么?