我必须构建一个胶水作业来更新和删除 Athena 表中的旧行。当我运行我的工作以删除它时,它会返回一个错误:
AnalysisException: 'Unable to infer schema for Parquet. It must be specified manually.;'
我的胶水工作:
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "test-database", table_name = "test_table", transformation_ctx = "datasource0")
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "test-database", table_name = "test_table_output", transformation_ctx = "datasource1")
datasource0.toDF().createOrReplaceTempView("view_dyf")
datasource1.toDF().createOrReplaceTempView("view_dyf_output")
ds = spark.sql("SELECT * FROM view_dyf_output where id in (select id from view_dyf where op like 'D')")
hudi_delete_options = {
'hoodie.table.name': 'test_table_output',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.table.name': 'test_table_output',
'hoodie.datasource.write.operation': 'delete',
'hoodie.datasource.write.precombine.field': 'name',
'hoodie.upsert.shuffle.parallelism': 1,
'hoodie.insert.shuffle.parallelism': 1
}
from pyspark.sql.functions import lit
deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
df = spark.sparkContext.parallelize(deletes).toDF(['id']).withColumn('name', lit(0.0))
df.write.format("hudi"). \
options(**hudi_delete_options). \
mode("append"). \
save('s3://data/test-output/')
roAfterDeleteViewDF = spark. \
read. \
format("hudi"). \
load("s3://data/test-output/")
roAfterDeleteViewDF.registerTempTable("test_table_output")
spark.sql("SELECT * FROM view_dyf_output where id in (select distinct id from view_dyf where op like 'D')").count()
我有 2 个数据源;第一个旧的 Athena 表,其中数据必须更新或删除,第二个表是新的更新或删除的数据。
在ds
我选择了旧表中必须删除的所有行。
op
用于操作;'D' 表示删除,'U' 表示更新。
有谁知道我在这里想念什么?