9

我正在尝试在单个节点(本地 [*])上以独立模式通过 JDBC 访问中型 Teradata 表(约 1 亿行)。

我正在使用 Spark 1.4.1。并且设置在非常强大的机器上(2 cpu,24 核,126G RAM)。

我尝试了几种内存设置和调整选项以使其工作得更快,但它们都没有产生巨大的影响。

我确信我遗漏了一些东西,下面是我的最后一次尝试,它花了大约 11 分钟来获得这个简单的计数,而使用通过 R 的 JDBC 连接只花了 40 秒来获得计数。

bin/pyspark --driver-memory 40g --executor-memory 40g

df = sqlContext.read.jdbc("jdbc:teradata://......)
df.count()

当我尝试使用 BIG 表(5B 记录)时,查询完成后没有返回任何结果。

4

3 回答 3

16

所有的聚合操作都是在整个数据集被检索到内存中的DataFrame集合之后执行的。因此,在 Spark 中进行计数永远不会像直接在 TeraData 中那样高效。有时通过创建视图然后使用 JDBC API 映射这些视图来将一些计算推送到数据库中是值得的。

每次使用 JDBC 驱动程序访问大表时,您都应该指定分区策略,否则您将创建一个带有单个分区的DataFrame/ ,并且您将重载单个 JDBC 连接。RDD

相反,您想尝试以下 AI(自 Spark 1.4.0+ 起):

sqlctx.read.jdbc(
  url = "<URL>",
  table = "<TABLE>",
  columnName = "<INTEGRAL_COLUMN_TO_PARTITION>", 
  lowerBound = minValue,
  upperBound = maxValue,
  numPartitions = 20,
  connectionProperties = new java.util.Properties()
)

还有一个选项可以下推一些过滤。

如果您没有均匀分布的整数列,您希望通过指定自定义谓词(where语句)来创建一些自定义分区。例如,假设您有一个时间戳列并希望按日期范围进行分区:

    val predicates = 
  Array(
    "2015-06-20" -> "2015-06-30",
    "2015-07-01" -> "2015-07-10",
    "2015-07-11" -> "2015-07-20",
    "2015-07-21" -> "2015-07-31"
  )
  .map {
    case (start, end) => 
      s"cast(DAT_TME as date) >= date '$start'  AND cast(DAT_TME as date) <= date '$end'"
  }

 predicates.foreach(println) 

// Below is the result of how predicates were formed 
//cast(DAT_TME as date) >= date '2015-06-20'  AND cast(DAT_TME as date) <= date '2015-06-30'
//cast(DAT_TME as date) >= date '2015-07-01'  AND cast(DAT_TME as date) <= date '2015-07-10'
//cast(DAT_TME as date) >= date '2015-07-11'  AND cast(DAT_TME as date) <= date //'2015-07-20'
//cast(DAT_TME as date) >= date '2015-07-21'  AND cast(DAT_TME as date) <= date '2015-07-31'


sqlctx.read.jdbc(
  url = "<URL>",
  table = "<TABLE>",
  predicates = predicates,
  connectionProperties = new java.util.Properties()
)

它将生成一个DataFramewhere 每个分区将包含与不同谓词关联的每个子查询的记录。

检查DataFrameReader.scala的源代码

于 2015-09-30T14:49:53.860 回答
5

未序列化的表是否适合 40 GB?如果它开始交换磁盘性能将急剧下降。

无论如何,当您使用带有 ansi SQL 语法的标准 JDBC 时,您会利用 DB 引擎,因此如果 teradata(我不知道 teradata)保存有关您的表的统计信息,那么经典的“从表中选择计数(*)”将非常快。相反,spark 正在使用“select * from table”之类的内容将 1 亿行加载到内存中,然后将对 RDD 行进行计数。这是一个完全不同的工作量。

于 2015-08-24T18:24:54.193 回答
0

与其他解决方案不同的一种解决方案是将 oracle 表中的数据保存在 avro 文件中(在许多文件中分区)保存在 hadoop 上。这样用 spark 读取那些 avro 文件会很轻松,因为你不会再调用 db 了。

于 2019-08-19T20:50:21.370 回答