0

我必须构建一个胶水作业来更新和删除 Athena 表中的旧行。当我运行我的工作以删除它时,它会返回一个错误:

AnalysisException: 'Unable to infer schema for Parquet. It must be specified manually.;'

我的胶水工作:

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "test-database", table_name = "test_table", transformation_ctx = "datasource0")
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "test-database", table_name = "test_table_output", transformation_ctx = "datasource1")

datasource0.toDF().createOrReplaceTempView("view_dyf")
datasource1.toDF().createOrReplaceTempView("view_dyf_output")

ds = spark.sql("SELECT * FROM view_dyf_output where id in (select id from view_dyf where op like 'D')")

hudi_delete_options = {
  'hoodie.table.name': 'test_table_output',
  'hoodie.datasource.write.recordkey.field': 'id',
  'hoodie.datasource.write.table.name': 'test_table_output',
  'hoodie.datasource.write.operation': 'delete',
  'hoodie.datasource.write.precombine.field': 'name',
  'hoodie.upsert.shuffle.parallelism': 1, 
  'hoodie.insert.shuffle.parallelism': 1
}

from pyspark.sql.functions import lit
deletes = list(map(lambda row: (row[0], row[1]), ds.collect()))
df = spark.sparkContext.parallelize(deletes).toDF(['id']).withColumn('name', lit(0.0))

df.write.format("hudi"). \
  options(**hudi_delete_options). \
  mode("append"). \
  save('s3://data/test-output/')



roAfterDeleteViewDF = spark. \
  read. \
  format("hudi"). \
  load("s3://data/test-output/") 
roAfterDeleteViewDF.registerTempTable("test_table_output")

spark.sql("SELECT * FROM view_dyf_output where id in (select distinct id from view_dyf where op like 'D')").count()  

我有 2 个数据源;第一个旧的 Athena 表,其中数据必须更新或删除,第二个表是新的更新或删除的数据。

ds我选择了旧表中必须删除的所有行。

op用于操作;'D' 表示删除,'U' 表示更新。

有谁知道我在这里想念什么?

4

1 回答 1

1

hoodie.datasource.write.operation 的值在您的代码中无效,支持的写入操作为:UPSERT/Insert/Bulk_insert。检查Hudi 文档

另外,您删除记录的意图是什么:硬删除还是软删除?对于硬删除,您必须提供 {'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.EmptyHoodieRecordPayload}

于 2021-07-08T13:40:38.840 回答