0

我正在尝试使用MERGE_ON_READ表类型创建一个简单的 hudi 表。在hoodie.properties文件中执行代码后,我看到hoodie.table.type=COPY_ON_WRITE

我在这里错过了什么吗?

此代码的 Jupyter Notebook: https ://github.com/sannidhiteredesai/spark/blob/master/hudi_acct.ipynb

hudi_options = {
    "hoodie.table.name": "hudi_acct",
    "hoodie.table.type": "MERGE_ON_READ",
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.recordkey.field": "acctid",
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.datasource.write.partitionpath.field": "date",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.upsert.shuffle.parallelism": 8,
    "hoodie.insert.shuffle.parallelism": 8,
}

input_df = spark.createDataFrame(
    [
        (100, "2015-01-01", "2015-01-01T13:51:39.340396Z", 10),
        (101, "2015-01-01", "2015-01-01T12:14:58.597216Z", 10),
        (102, "2015-01-01", "2015-01-01T13:51:40.417052Z", 10),
        (103, "2015-01-01", "2015-01-01T13:51:40.519832Z", 10),
        (104, "2015-01-02", "2015-01-01T12:15:00.512679Z", 10),
        (104, "2015-01-02", "2015-01-01T12:15:00.512679Z", 10),
        (104, "2015-01-02", "2015-01-02T12:15:00.512679Z", 20),
        (105, "2015-01-02", "2015-01-01T13:51:42.248818Z", 10),
    ],
    ("acctid", "date", "ts", "deposit"),
)

# INSERT
(
    input_df.write.format("org.apache.hudi")
    .options(**hudi_options)
    .mode("append")
    .save(hudi_dataset)
)


update_df = spark.createDataFrame(
    [(100, "2015-01-01", "2015-01-01T13:51:39.340396Z", 20)],
    ("acctid", "date", "ts", "deposit"))

# UPDATE
(
    update_df.write.format("org.apache.hudi")
    .options(**hudi_options)
    .mode("append")
    .save(hudi_dataset)
)

编辑:执行上述代码后,我看到在 date=2015-01-01 分区中创建了 2 个镶木地板文件。在阅读第二个镶木地板文件时,我希望只获得更新的 1 条记录,但我也可以看到该分区中的所有其他记录。

4

2 回答 2

0

在使用 insert 将数据首先加载到 hudi 时,请尝试 mode("overwrite") 并查看它是否有效?

于 2021-07-11T00:59:27.413 回答
0

问题在于"hoodie.table.type": "MERGE_ON_READ",配置。你必须hoodie.datasource.write.table.type改用。如果您按如下方式更新配置,它将起作用。我已经测试过了。

hudi_options = {
    "hoodie.table.name": "hudi_acct",
    "hoodie.datasource.write.table.type": "MERGE_ON_WRITE",
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.datasource.write.recordkey.field": "acctid",
    "hoodie.datasource.write.precombine.field": "ts",
    "hoodie.datasource.write.partitionpath.field": "date",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.upsert.shuffle.parallelism": 8,
    "hoodie.insert.shuffle.parallelism": 8,
    "hoodie.compact.inline": "true",
    "hoodie.compact.inline.max.delta.commits": 10
}
于 2021-07-12T19:09:53.563 回答