1

我正在尝试从 PostgreSQL 表中并行读取数据。我使用时间戳列作为分区列,并提供下限、上限和 numPartitions 的值。它正在创建多个查询以并行读取数据,但并未将过滤器下推到 PostgreSQL 数据库。当我在数据框上使用解释命令时,物理计划中的推送过滤器中没有任何内容。我也尝试在加载方法之后应用过滤器子句,但它仍然没有按下过滤器。

选项1:这里我没有使用过滤条件

 val df = spark.read
        .format("jdbc")
        .option("url", jdbcurl)
        .option("dbtable", query)
        .option("partitionColumn","transactionbegin")
        .option("numPartitions",12) 
        .option("driver", "org.postgresql.Driver")
        .option("fetchsize", 50000)  
        .option("user","user")
        .option("password", "password")
        .option("lowerBound","2018-01-01 00:00:00")
        .option("upperBound","2018-12-31 23:59:00")
        .load

解释计划输出

== Physical Plan ==
*(1) Scan JDBCRelation((   SELECT columnnames
FROM schema.Transaction ) a) [numPartitions=12] [columnnames] PushedFilters: [], ReadSchema: struct<columnnames>

现在,如果我在 df 上进行解释,则推送的过滤器中没有任何内容,但是我能够使用 pg_stat_activity 从 PostgreSQL 获取的查询显示了 12 个具有 where 条件的不同查询。我在这里提供一个查询。

SELECT 1 FROM (   SELECT columnnames
FROM schema.Transaction ) a WHERE "transactionbegin" >= '2018-03-02 19:59:50' AND "transactionbegin" < '2018-04-02 05:59:45'

我在这里有点困惑,无论是过滤 PostgreSQL 中的记录还是在 spark 中根据解释计划进行过滤,您在推送的过滤器中没有任何内容,但根据生成的查询,它看起来像是在过滤PostgreSQL 中的数据。

选项 2:使用过滤条件

val df = spark.read
        .format("jdbc")
        .option("url", jdbcurl)
        .option("dbtable", query)
        .option("partitionColumn","transactionbegin")
        .option("numPartitions",12) 
        .option("driver", "org.postgresql.Driver")
        .option("fetchsize", 50000)  
        .option("user","user")
        .option("password", "password")
        .option("lowerBound","2018-01-01 00:00:00")
        .option("upperBound","2018-12-31 23:59:00")
        .load.filter(s"TransactionBegin between cast('2018-01-01 00:00:00' as TIMESTAMP) and cast('2018-12-31 23:59:00' as TIMESTAMP)")

解释上述数据框的计划

== Physical Plan ==
*(1) Scan JDBCRelation((   SELECT columnnames
FROM schema.Transaction ) a) [numPartitions=12] [columnnames] 
PushedFilters: [*IsNotNull(transactionbegin), *GreaterThanOrEqual(transactionbegin,2018-01-01 00:00:00.0),..., ReadSchema: struct<columnnames>

使用 pg_stat_activity 来自 PostgreSQL 的查询之一

SELECT 1 FROM (   SELECT columnnames
FROM schema.Transaction ) a 
WHERE (("transactionbegin" IS NOT NULL) AND ("transactionbegin" >= '2018-01-01 00:00:00.0') 
AND ("transactionbegin" <= '2018-12-31 23:59:00.0')) AND 
("transactionbegin" >= '2018-06-02 01:59:35' AND "transactionbegin" < '2018-07-02 11:59:30')

我想了解的是,为什么在提供分区列和下限和上限时,它没有将过滤器推送到数据库,而是在通过将值转换为时间戳来应用显式过滤器之后,它会将过滤器向下推。框架也不应该足够聪明,将我们传递的值视为下限和上限,以将其视为时间戳列的范围。

如果您有大量数据需要在过滤条件之后读取,那么最有效的处理方法是什么?

4

0 回答 0