0

我使用 AWS Glue 和 Apache Hudi 将 RDS 中的数据复制到 S3。如果我执行以下作业,将在 S3 存储桶 (basePath) 中生成 2 个 parquet 文件(初始文件和更新文件)。在这种情况下,我只想要 1 个最新文件,并想删除旧文件。

有谁知道如何在存储桶中保留 1 个最新文件?

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
sc = spark.sparkContext
glueContext = GlueContext(sc)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(5))
df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.show()

tableName = 'hudi_mor_athena_sample' 
bucketName = 'cm-sato-hudi-sample--datalake'
basePath = f's3://{bucketName}/{tableName}'

hudi_options = {
  'hoodie.table.name': tableName,
  'hoodie.datasource.write.storage.type': 'MERGE_ON_READ',
  'hoodie.compact.inline': 'false',
  'hoodie.datasource.write.recordkey.field': 'uuid', 
  'hoodie.datasource.write.partitionpath.field': 'partitionpath',
  'hoodie.datasource.write.table.name': tableName,
  'hoodie.datasource.write.operation': 'upsert', 
  'hoodie.datasource.write.precombine.field': 'ts', 
  'hoodie.upsert.shuffle.parallelism': 2, 
  'hoodie.insert.shuffle.parallelism': 2,
}

df.write.format("hudi"). \
  options(**hudi_options). \
  mode("overwrite"). \
  save(basePath)

updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(3))
df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.show()

# update
df.write.format("hudi"). \
  options(**hudi_options). \
  mode("append"). \
  save(basePath)

job.commit()
4

1 回答 1

0

而不是mode("append")使用mode("overwrite")

于 2021-11-30T14:58:49.917 回答