1

我正在使用 AWS Athena 查询 S3 存储桶,该存储桶仅按天分区数据,分区看起来像day=yyyy/mm/dd。当我尝试使用 Glue 每天运行更新分区时,它每天都会创建新表(同步 2017 年,大约 1500 个表)。

我尝试像这样使用分区投影:

PARTITIONED BY ( 
  day string)

TBLPROPERTIES (
  'has_encrypted_data'='false', 
  'projection.day.format'='yyyy/mm/dd', 
  'projection.day.interval'='1', 
  'projection.day.interval.unit'='DAYS', 
  'projection.day.range'='2017/01/01,NOW', 
  'projection.day.type'='date', 
  'projection.enables'='true'

但是没有 MSCK 修复,分区不会更新。有任何想法吗?我错过了分区投影的一些东西吗?

4

2 回答 2

1

问题是您不能/使用日期格式。要么您需要/用其他东西(如 a -)替换,要么切换到年、月和日的整数格式,并拥有三个分区。

三个分区如下所示:

projection.year.type=integer,
projection.year.range='2017,2025',
projection.year.format='day=${year}',
projection.month.type=integer,
projection.month.range='01,12',
projection.day.type=integer,
projection.day.range='01,31'
于 2021-02-09T13:32:43.880 回答
1

You don't need to use Glue or MSCK REPAIR TABLE if you are loading partitions using Partition Projection. Just run the CREATE TABLE script once from the query editor and that should be it. If you are loading partitions using Partition Projection, you won't be able to see the partitions in the Glue Data Catalog.

or maybe the below script can help you

#Import libraries
import boto3
import datetime

#Connection for S3 and Athena
s3 = boto3.client('s3')
athena = boto3.client('athena')

#Get Year, Month, Day for partition (this will get tomorrow date's value)
date = datetime.datetime.now()
athena_year = str(date.year)
athena_month = str(date.month).rjust(2, '0')
athena_day = str(date.day + 1).rjust(2, '0')

#Parameters for S3 log location and Athena table
#Fill this carefully (Read the commented section on top to help)
s3_buckcet = 'sqladmin-cloudtrail'
s3_prefix = 'AWSLogs/XXXXXXXXXXXX/CloudTrail/'
s3_input = 's3://' + s3_buckcet + '/' + s3_prefix
s3_ouput = 's3://aws-athena-query-results-XXXXXXXXXXXXXX-us-east-1'
database = 'athena_log_database'
table_name = 'cloudtrail_logs_table'

#Executing the athena query:
def run_query(query, database, s3_output):
        query_response = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': database
            },
        ResultConfiguration={
            'OutputLocation': s3_output,
            }
        )
        print('Execution ID: ' + query_response['QueryExecutionId'])
        return query_response

#Main function for get regions and run the query on the captured regions
def lambda_handler(event, context):
 result =  s3.list_objects(Bucket=s3_buckcet,Prefix=s3_prefix, Delimiter='/')
 for regions in result.get('CommonPrefixes'):
    get_region=(regions.get('Prefix','').replace(s3_prefix,'').replace('/',''))
    query = str("ALTER TABLE "+ table_name +" ADD PARTITION (region='"
            + get_region + "',year="
            + athena_year + ",month="
            + athena_month + ",day="
            + athena_day
            + ") location '"+s3_input
            + get_region
            + "/" + athena_year + "/" + athena_month + "/"
            + athena_day + "';")
      #print(get_region) -- for debug
      #print(query) -- for debug
run_query(query, database, s3_ouput)

You can run a glue job with a similar script to create the partitions daily. Just change the ALTER TABLE part accordingly and it should be good to go.

于 2021-02-08T22:03:29.677 回答