39

我有一个每小时执行一次的 Spark 批处理作业。S3每次运行都会使用目录命名模式生成和存储新数据DATA/YEAR=?/MONTH=?/DATE=?/datafile

将数据上传到 后S3,我想使用Athena. 另外,我想QuickSight通过连接到 Athena 作为数据源来将它们可视化。

问题是每次运行我的 Spark 批处理后,S3Athena 不会发现存储在其中的新生成的数据,除非我手动运行查询MSCK REPAIR TABLE

有没有办法让 Athena 自动更新数据,这样我就可以创建一个全自动的数据可视化管道?

4

3 回答 3

25

有多种方法可以安排此任务。您如何安排工作流程?您使用AirflowLuigiAzkaban、cron 等系统还是使用AWS 数据管道

从其中任何一个中,您应该能够触发以下 CLI 命令。

$ aws athena start-query-execution --query-string "MSCK REPAIR TABLE some_database.some_table" --result-configuration "OutputLocation=s3://SOMEPLACE"

另一种选择是AWS Lambda。您可以有一个函数调用MSCK REPAIR TABLE some_database.some_table以响应对 S3 的新上传。

一个示例 Lambda 函数可以这样编写:

import boto3

def lambda_handler(event, context):
    bucket_name = 'some_bucket'

    client = boto3.client('athena')

    config = {
        'OutputLocation': 's3://' + bucket_name + '/',
        'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}

    }

    # Query Execution Parameters
    sql = 'MSCK REPAIR TABLE some_database.some_table'
    context = {'Database': 'some_database'}

    client.start_query_execution(QueryString = sql, 
                                 QueryExecutionContext = context,
                                 ResultConfiguration = config)

然后,您将配置一个触发器以DATA/在您的存储桶的前缀下添加新数据时执行您的 Lambda 函数。

最终,在使用作业调度程序运行 Spark 作业后显式重建分区具有自我记录的优势。另一方面,AWS Lambda 对于这样的工作很方便。

于 2017-11-29T14:12:45.440 回答
7

您应该ADD PARTITION改为运行:

aws athena start-query-execution --query-string "ALTER TABLE ADD PARTITION..."

这会从您的位置添加一个新创建的分区S3Athena 利用 Hive 对数据进行分区。要创建带有分区的表,您必须在CREATE TABLE语句期间对其进行定义。用于PARTITIONED BY定义分区数据的键。

于 2018-12-14T20:56:20.630 回答
4

有多种方法可以解决问题并更新表格:

  1. 打电话MSCK REPAIR TABLE。这将扫描所有数据。这是昂贵的,因为每个文件都被完整读取(至少它是由 AWS 完全收费的)。它也非常缓慢。简而言之:不要这样做!

  2. 通过调用自己创建分区ALTER TABLE ADD PARTITION abc ...。从某种意义上说,这很好,无需扫描数据并且成本低。查询也很快,所以这里没有问题。如果您的文件结构非常混乱,没有任何通用模式(在您看来并非如此,因为它是一个组织良好的 S3 密钥模式),这也是一个不错的选择。这种方法也有缺点: A)很难维护 B)所有分区都将存储在 GLUE 目录中。当您有很多分区时,这可能会成为一个问题,因为它们需要被读取并传递给 Athenas 和 EMR Hadoop 基础架构。

  3. 使用分区投影。您可能想要评估两种不同的风格。这是在查询时为 Hadoop 创建分区的变体。这意味着没有 GLUE 目录条目通过网络发送,因此可以更快地处理大量分区。缺点是您可能会“碰到”一些可能不存在的分区。这些当然会被忽略,但在内部将生成所有可能与您的查询匹配的分区 - 无论它们是否在 S3 上(因此始终向您的查询添加分区过滤器!)。如果正确完成,此选项是一种即发即的方法,因为不需要更新。

CREATE EXTERNAL TABLE `mydb`.`mytable`
(
   ...
)
  PARTITIONED BY (
    `YEAR` int,
    `MONTH` int,
    `DATE` int)
  ...
  LOCATION
    's3://DATA/'
  TBLPROPERTIES(
      "projection.enabled" = "true",
      "projection.account.type" = "integer",
      "projection.account.range" = "1,50",
      "projection.YEAR.type" = "integer",
      "projection.YEAR.range" = "2020,2025",
      "projection.MONTH.type" = "integer",
      "projection.MONTH.range" = "1,12",
      "projection.DATE.type" = "integer",
      "projection.DATE.range" = "1,31",
      "storage.location.template" = "s3://DATA/YEAR=${YEAR}/MONTH=${MONTH}/DATE=${DATE}/"
  );

https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html

  1. 仅列出所有选项:您也可以使用GLUE crawlers. 但这似乎不是一个有利的方法,因为它不像宣传的那样灵活。

  2. 您可以直接使用 GLUE 获得更多控制权,如果您有很多自动化脚本来完成设置表格的准备工作Glue Data Catalog API,这可能是方法2的替代方法。

简而言之:

  • 如果您的应用程序以 SQL 为中心,您喜欢没有脚本的最精简方法,请使用分区投影
  • 如果您有很多分区,请使用分区投影
  • 如果您有几个分区或分区没有通用模式,请使用方法 #2
  • 如果您的脚本繁重,并且脚本完成了大部分工作并且更容易为您处理,请考虑方法 #5
  • 如果您感到困惑并且不知道从哪里开始 - 先尝试分区投影!它应该适合 95% 的用例。
于 2021-04-21T07:12:01.487 回答