这是我想出的解决方案,使用 Boto3 查找工作书签信息。使用最新的书签信息动态构建分区谓词以覆盖昨天、今天和明天的分区。这样,我将数据扫描限制在 3 天的文件范围内。显然你可以修改它以满足你的需要
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *
from time import strftime
import boto3
from botocore.exceptions import ClientError
from datetime import datetime, timedelta
import json
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Dynamically build the partitionPredicate
predicate_date = datetime.today()
try:
response = glue_client.get_job_bookmark(JobName=args['JOB_NAME'])
JobBookmark = json.loads(response["JobBookmarkEntry"]["JobBookmark"])
predicate_date = datetime.strptime(JobBookmark["datasource0"]["timestamps"]["CURR_RUN_START_TIME"], '%Y-%m-%dT%H:%M:%S.%fZ')
except ClientError as e:
# raise Exception("boto3 client error in retrieves_details_of_bookmarked_job: " + e.__str__())
print("No bookmark available.")
except Exception as e:
# raise Exception("Unexpected error in retrieves_details_of_bookmarked_job: " + e.__str__())
print("No bookmark available.")
yesterday = predicate_date + timedelta(days = -1)
tomorrow = predicate_date + timedelta(days = 1)
# Having a predicate greatly helps in the performance. It can also be used to establish a bookmark
partitionPredicate = "(partition_0='{year}' OR partition_0='{year_2}' OR partition_0='{year_3}') AND (partition_1='{month}' OR partition_1='{month_2}' OR partition_1='{month_3}') AND (partition_2='{day}' OR partition_2='{day_2}' OR partition_2='{day_3}') ".format(year=predicate_date.strftime("%Y"), month=predicate_date.strftime("%m"), day=predicate_date.strftime("%d"), year_2=tomorrow.strftime("%Y"), month_2=tomorrow.strftime("%m"), day_2=tomorrow.strftime("%d"), year_3=yesterday.strftime("%Y"), month_3=yesterday.strftime("%m"), day_3=yesterday.strftime("%d"))
print(partitionPredicate)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "my_database", table_name = "my_table", transformation_ctx = "datasource0", push_down_predicate = partitionPredicate)
print('Initial Row Count: '+str(datasource0.count()))
job.commit()