2

我试图从使用下推谓词读取动态帧的胶水作业中读取写入 S3 表中的最后一个分区。

我要读取的表每天都会加载,因此会为该每日数据创建一个新分区。

我有另一个 Glue 作业将从该表中读取,但我只想读取最后一个分区中写入的最后一个数据。我不想阅读整个表格然后获取最新数据(大数据量、低效率、成本......),因为我可以使用下推谓词。问题是,最后一个分区的值每天都在变化。

我尝试使用 boto3 列出 S3 中的对象,并使用 get_partitions 函数检索值,我知道我可以在 Athena 中查询:

SELECT partition_key, max(partition_value)
FROM information_schema.__internal_partitions__
WHERE table_schema = <database name>
        AND table_name = <table name>
group by 1 

但是有没有更简单的方法可以在胶水作业中实现这一点?

谢谢

4

1 回答 1

0

这是我想出的解决方案,使用 Boto3 查找工作书签信息。使用最新的书签信息动态构建分区谓词以覆盖昨天、今天和明天的分区。这样,我将数据扫描限制在 3 天的文件范围内。显然你可以修改它以满足你的需要

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import *

from time import strftime
import boto3
from botocore.exceptions import ClientError
from datetime import datetime, timedelta
import json

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Dynamically build the partitionPredicate
predicate_date = datetime.today()
try:
    response = glue_client.get_job_bookmark(JobName=args['JOB_NAME'])
    JobBookmark = json.loads(response["JobBookmarkEntry"]["JobBookmark"])
    predicate_date = datetime.strptime(JobBookmark["datasource0"]["timestamps"]["CURR_RUN_START_TIME"], '%Y-%m-%dT%H:%M:%S.%fZ')
except ClientError as e:
    # raise Exception("boto3 client error in retrieves_details_of_bookmarked_job: " + e.__str__())
    print("No bookmark available.")
except Exception as e:
    # raise Exception("Unexpected error in retrieves_details_of_bookmarked_job: " + e.__str__())
    print("No bookmark available.")

yesterday = predicate_date + timedelta(days = -1)
tomorrow = predicate_date + timedelta(days = 1)

# Having a predicate greatly helps in the performance. It can also be used to establish a bookmark
partitionPredicate = "(partition_0='{year}' OR partition_0='{year_2}' OR partition_0='{year_3}') AND (partition_1='{month}' OR partition_1='{month_2}' OR partition_1='{month_3}') AND (partition_2='{day}' OR partition_2='{day_2}' OR partition_2='{day_3}') ".format(year=predicate_date.strftime("%Y"), month=predicate_date.strftime("%m"), day=predicate_date.strftime("%d"), year_2=tomorrow.strftime("%Y"), month_2=tomorrow.strftime("%m"), day_2=tomorrow.strftime("%d"), year_3=yesterday.strftime("%Y"), month_3=yesterday.strftime("%m"), day_3=yesterday.strftime("%d"))

print(partitionPredicate)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "my_database", table_name = "my_table", transformation_ctx = "datasource0", push_down_predicate = partitionPredicate)
print('Initial Row Count: '+str(datasource0.count()))

job.commit()
于 2022-02-16T16:53:29.630 回答