所有的聚合操作都是在整个数据集被检索到内存中的DataFrame
集合之后执行的。因此,在 Spark 中进行计数永远不会像直接在 TeraData 中那样高效。有时通过创建视图然后使用 JDBC API 映射这些视图来将一些计算推送到数据库中是值得的。
每次使用 JDBC 驱动程序访问大表时,您都应该指定分区策略,否则您将创建一个带有单个分区的DataFrame
/ ,并且您将重载单个 JDBC 连接。RDD
相反,您想尝试以下 AI(自 Spark 1.4.0+ 起):
sqlctx.read.jdbc(
url = "<URL>",
table = "<TABLE>",
columnName = "<INTEGRAL_COLUMN_TO_PARTITION>",
lowerBound = minValue,
upperBound = maxValue,
numPartitions = 20,
connectionProperties = new java.util.Properties()
)
还有一个选项可以下推一些过滤。
如果您没有均匀分布的整数列,您希望通过指定自定义谓词(where
语句)来创建一些自定义分区。例如,假设您有一个时间戳列并希望按日期范围进行分区:
val predicates =
Array(
"2015-06-20" -> "2015-06-30",
"2015-07-01" -> "2015-07-10",
"2015-07-11" -> "2015-07-20",
"2015-07-21" -> "2015-07-31"
)
.map {
case (start, end) =>
s"cast(DAT_TME as date) >= date '$start' AND cast(DAT_TME as date) <= date '$end'"
}
predicates.foreach(println)
// Below is the result of how predicates were formed
//cast(DAT_TME as date) >= date '2015-06-20' AND cast(DAT_TME as date) <= date '2015-06-30'
//cast(DAT_TME as date) >= date '2015-07-01' AND cast(DAT_TME as date) <= date '2015-07-10'
//cast(DAT_TME as date) >= date '2015-07-11' AND cast(DAT_TME as date) <= date //'2015-07-20'
//cast(DAT_TME as date) >= date '2015-07-21' AND cast(DAT_TME as date) <= date '2015-07-31'
sqlctx.read.jdbc(
url = "<URL>",
table = "<TABLE>",
predicates = predicates,
connectionProperties = new java.util.Properties()
)
它将生成一个DataFrame
where 每个分区将包含与不同谓词关联的每个子查询的记录。
检查DataFrameReader.scala的源代码