有多种方法可以安排此任务。您如何安排工作流程?您使用Airflow、Luigi、Azkaban、cron 等系统还是使用AWS 数据管道?
从其中任何一个中,您应该能够触发以下 CLI 命令。
$ aws athena start-query-execution --query-string "MSCK REPAIR TABLE some_database.some_table" --result-configuration "OutputLocation=s3://SOMEPLACE"
另一种选择是AWS Lambda。您可以有一个函数调用MSCK REPAIR TABLE some_database.some_table
以响应对 S3 的新上传。
一个示例 Lambda 函数可以这样编写:
import boto3
def lambda_handler(event, context):
bucket_name = 'some_bucket'
client = boto3.client('athena')
config = {
'OutputLocation': 's3://' + bucket_name + '/',
'EncryptionConfiguration': {'EncryptionOption': 'SSE_S3'}
}
# Query Execution Parameters
sql = 'MSCK REPAIR TABLE some_database.some_table'
context = {'Database': 'some_database'}
client.start_query_execution(QueryString = sql,
QueryExecutionContext = context,
ResultConfiguration = config)
然后,您将配置一个触发器以DATA/
在您的存储桶的前缀下添加新数据时执行您的 Lambda 函数。
最终,在使用作业调度程序运行 Spark 作业后显式重建分区具有自我记录的优势。另一方面,AWS Lambda 对于这样的工作很方便。