我使用 AWS Glue 和 Apache Hudi 将 RDS 中的数据复制到 S3。如果我执行以下作业,将在 S3 存储桶 (basePath) 中生成 2 个 parquet 文件(初始文件和更新文件)。在这种情况下,我只想要 1 个最新文件,并想删除旧文件。
有谁知道如何在存储桶中保留 1 个最新文件?
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(5))
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.show()
tableName = 'hudi_mor_athena_sample'
bucketName = 'cm-sato-hudi-sample--datalake'
basePath = f's3://{bucketName}/{tableName}'
hudi_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.storage.type': 'MERGE_ON_READ',
'hoodie.compact.inline': 'false',
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
'hoodie.datasource.write.table.name': tableName,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2,
}
df.write.format("hudi"). \
options(**hudi_options). \
mode("overwrite"). \
save(basePath)
updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(3))
df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.show()
# update
df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save(basePath)
job.commit()