我必须使用带有 Spark (2.2) 的 JDBC 从 Oracle 数据库中读取数据。为了最小化传输的数据,我使用了一个下推查询,它已经过滤了要加载的数据。然后将该数据附加到现有的 Hive 表中。为了记录已加载的内容,我计算了通过 JDBC 加载的记录。我的代码基本上是这样的:
val query = "(select a, b from table where c = 1) t"
val myDF = spark.read.jdbc(jdbc_url, query, partitionColumn, lowerBound, upperBound, 10, connectionProperties).cache()
myDF.write.mode(SaveMode.Append)
.format("parquet")
.saveAsTable("my_table_name")
val numRecs = myDF.count()
我的假设是,由于 cache(),DataFrame 通过 JDBC 读取一次,保存并用于计算记录。但是当我查看在 Oracle 中创建的会话时,我看起来像是计数操作本身在 Oracle 数据库上创建了 10 个会话。起初我看到 10 个会话,基本上是这样的:
SELECT * FROM (select a, b from table where c = 1) t WHERE a >= x AND < a y
并且,在完成之后,另外 10 个会话看起来像这样:
SELECT 1 FROM (select a, b from table where c = 1) t WHERE a >= x AND < a y
因此,看起来 Spark 从 JDBC 源加载数据只是为了计算记录,而使用已经加载的数据应该已经足够了。如何防止这种情况发生并且 Spark 只能从 JDBC 源读取一次数据?
更新
事实证明,我是盲目的:在调用 saveAsTable 之前,我的代码中有另一个 count() 。所以这完全有道理,在 DataFrame 上调用的第一个动作确实是 count()。消除这一点后,它的行为与预期的一样。