4

下面的查询扫描 100 mb 的数据。

select * from table where column1 = 'val' and partition_id = '20190309';

但是下面的查询扫描 15 GB 的数据(有超过 90 个分区)

select * from table where column1 = 'val' and partition_id in (select max(partition_id) from table);

如何优化第二个查询以扫描与第一个相同数量的数据?

4

3 回答 3

9

这里有两个问题。上面标量子查询的效率select max(partition_id) from table,以及@PiotrFindeisen 指出的动态过滤的效率。

第一个问题是对 Hive 表的分区键的查询比它们看起来要复杂得多。大多数人会认为,如果您想要分区键的最大值,您可以简单地对分区键执行查询,但这不起作用,因为 Hive 允许分区为空(并且它还允许非空文件不包含任何行)。具体来说,上面的标量子查询select max(partition_id) from table要求Trino(以前称为 PrestoSQL)找到包含至少一行的最大分区。理想的解决方案是在 Hive 中拥有完美的统计信息,但如果不这样做,引擎将需要为 Hive 提供自定义逻辑,以打开分区文件,直到找到非空文件。

如果您确定您的仓库不包含空分区(或者如果您对它的含义感到满意),则可以将标量子查询替换为隐藏$partitions表上的一个“

select * 
from table 
where column1 = 'val' and 
    partition_id = (select max(partition_id) from "table$partitions");

第二个问题是@PiotrFindeisen 指出的问题,它与查询计划和执行的方式有关。大多数人会查看上面的查询,发现引擎显然应该select max(partition_id) from "table$partitions"在计划期间计算出值,将其内联到计划中,然后继续优化。不幸的是,一般来说,这是一个相当复杂的决定,因此引擎只是将其建模为广播连接,其中一部分执行计算出该值,并将该值广播给其他工作人员。问题是执行的其余部分无法将此新信息添加到现有处理中,因此它只是扫描所有数据,然后过滤掉您试图跳过的值。有一个项目正在进行中以添加此动态过滤,但还没有完成。

这意味着您今天能做的最好的事情就是运行两个单独的查询:一个获取最大 partition_id,另一个获取内联值。

顺便说一句,Presto 0.199中添加了隐藏的“$partitions”表,我们修复了0.201中的一些小错误。我不确定 Athena 基于哪个版本,但我相信它已经过时了(我写这个答案时的当前版本是309

于 2019-04-27T20:01:34.523 回答
2

编辑:Presto在其0.193 版本__internal_partitions__中删除了该表,因此我建议不要在任何生产系统中使用以下部分中定义的解决方案,因为 Athena '透明地'更新了 presto 版本。我最终只使用了幼稚的查询,但也使用了本节中概述的相同的回溯技巧。它比使用table 慢大约 3 倍,但至少当 Athena 决定更新他们的 presto 版本时它不会中断。Slow aggregation queries for partition keysSELECT max(partition_date) ...Lack of Dynamic Filtering__internal_partitions__

----- 原帖 -----

所以我想出了一个相当老套的方法来为大型数据集上的基于日期的分区完成此操作,因为当您只需要回顾几个分区的数据以匹配最大值时,请注意我不是 100% 确定information_schema.__internal_partitions__桌子的使用有多脆弱。

正如上面提到的@Dain,确实有两个问题。第一个是 max(partition_date) 查询的聚合有多慢,第二个是 Presto 缺乏对动态过滤的支持。

分区键的慢速聚合查询

为了解决第一个问题,我使用了该information_schema.__internal_partitions__表,它允许我快速聚合表的分区,而无需扫描文件中的数据。(请注意,partition_value以下查询中的partition_key、 和partition_number都是表的列名,__internal_partitions__与表的列无关)

如果您的表只有一个分区键,您可以执行以下操作:

SELECT max(partition_value) FROM information_schema.__internal_partitions__
WHERE table_schema = 'DATABASE_NAME' AND table_name = 'TABLE_NAME'

但是如果你有多个分区键,你需要更多这样的东西:

SELECT max(partition_date) as latest_partition_date from (
  SELECT max(case when partition_key = 'partition_date' then partition_value end) as partition_date, max(case when partition_key = 'another_partition_key' then partition_value end) as another_partition_key
  FROM information_schema.__internal_partitions__
  WHERE table_schema = 'DATABASE_NAME' AND table_name = 'TABLE_NAME'
  GROUP BY partition_number
)
WHERE
  -- ... Filter down by values for e.g. another_partition_key
)

这些查询应该运行得相当快(我的运行大约需要 1-2 秒),而无需扫描文件中的实际数据,但同样,我不确定使用这种方法是否有任何问题。

缺乏动态过滤

对于我的特定用例,我能够减轻第二个问题的最坏影响,因为我希望在从当前日期开始的有限时间内总是有一个分区(例如,我可以保证任何数据生产或分区-加载问题将在 3 天内得到解决)。事实证明,Athena 在使用 presto 的datetime 函数时确实做了一些预处理,因此动态过滤与使用子查询没有相同类型的问题。

因此,您可以使用 datetime 函数更改查询以限制实际最大值的回溯距离,从而限制扫描的数据量。

SELECT * FROM "DATABASE_NAME"."TABLE_NAME"
WHERE partition_date >= cast(date '2019-06-25' - interval '3' day as varchar) -- Will only scan partitions from 3 days before '2019-06-25'
AND partition_date = (
  -- Insert the partition aggregation query from above here
)
于 2019-06-25T20:38:36.430 回答
2

我不知道它是否仍然相关,但刚刚发现:

代替:

select * from table where column1 = 'val' and partition_id in (select max(partition_id) from table);

利用:

select a.* from table a 
inner join (select max(partition_id) max_id from table) b on a.partition_id=b.max_id
where column1 = 'val';

我认为这与使用分区的连接优化有关。

于 2020-02-04T10:48:52.740 回答