我正在尝试从 PostgreSQL 表中并行读取数据。我使用时间戳列作为分区列,并提供下限、上限和 numPartitions 的值。它正在创建多个查询以并行读取数据,但并未将过滤器下推到 PostgreSQL 数据库。当我在数据框上使用解释命令时,物理计划中的推送过滤器中没有任何内容。我也尝试在加载方法之后应用过滤器子句,但它仍然没有按下过滤器。
选项1:这里我没有使用过滤条件
val df = spark.read
.format("jdbc")
.option("url", jdbcurl)
.option("dbtable", query)
.option("partitionColumn","transactionbegin")
.option("numPartitions",12)
.option("driver", "org.postgresql.Driver")
.option("fetchsize", 50000)
.option("user","user")
.option("password", "password")
.option("lowerBound","2018-01-01 00:00:00")
.option("upperBound","2018-12-31 23:59:00")
.load
解释计划输出
== Physical Plan ==
*(1) Scan JDBCRelation(( SELECT columnnames
FROM schema.Transaction ) a) [numPartitions=12] [columnnames] PushedFilters: [], ReadSchema: struct<columnnames>
现在,如果我在 df 上进行解释,则推送的过滤器中没有任何内容,但是我能够使用 pg_stat_activity 从 PostgreSQL 获取的查询显示了 12 个具有 where 条件的不同查询。我在这里提供一个查询。
SELECT 1 FROM ( SELECT columnnames
FROM schema.Transaction ) a WHERE "transactionbegin" >= '2018-03-02 19:59:50' AND "transactionbegin" < '2018-04-02 05:59:45'
我在这里有点困惑,无论是过滤 PostgreSQL 中的记录还是在 spark 中根据解释计划进行过滤,您在推送的过滤器中没有任何内容,但根据生成的查询,它看起来像是在过滤PostgreSQL 中的数据。
选项 2:使用过滤条件
val df = spark.read
.format("jdbc")
.option("url", jdbcurl)
.option("dbtable", query)
.option("partitionColumn","transactionbegin")
.option("numPartitions",12)
.option("driver", "org.postgresql.Driver")
.option("fetchsize", 50000)
.option("user","user")
.option("password", "password")
.option("lowerBound","2018-01-01 00:00:00")
.option("upperBound","2018-12-31 23:59:00")
.load.filter(s"TransactionBegin between cast('2018-01-01 00:00:00' as TIMESTAMP) and cast('2018-12-31 23:59:00' as TIMESTAMP)")
解释上述数据框的计划
== Physical Plan ==
*(1) Scan JDBCRelation(( SELECT columnnames
FROM schema.Transaction ) a) [numPartitions=12] [columnnames]
PushedFilters: [*IsNotNull(transactionbegin), *GreaterThanOrEqual(transactionbegin,2018-01-01 00:00:00.0),..., ReadSchema: struct<columnnames>
使用 pg_stat_activity 来自 PostgreSQL 的查询之一
SELECT 1 FROM ( SELECT columnnames
FROM schema.Transaction ) a
WHERE (("transactionbegin" IS NOT NULL) AND ("transactionbegin" >= '2018-01-01 00:00:00.0')
AND ("transactionbegin" <= '2018-12-31 23:59:00.0')) AND
("transactionbegin" >= '2018-06-02 01:59:35' AND "transactionbegin" < '2018-07-02 11:59:30')
我想了解的是,为什么在提供分区列和下限和上限时,它没有将过滤器推送到数据库,而是在通过将值转换为时间戳来应用显式过滤器之后,它会将过滤器向下推。框架也不应该足够聪明,将我们传递的值视为下限和上限,以将其视为时间戳列的范围。
如果您有大量数据需要在过滤条件之后读取,那么最有效的处理方法是什么?