1

我有一个 key = acctid 的 MOR 表,当我在同一个键上执行 3 次提交并尝试以增量模式读取时,我只看到第一次提交,是否有使用增量模式读取给定键的最后一次提交或所有提交?

请查看以下详细信息:

我在第一次运行时在 mor 表中插入了以下数据

input_df = spark.createDataFrame(
    [
        (100, "2015-01-01", "2015-01-01T01:01:01.010101Z", 10),
        (101, "2015-01-01", "2015-01-01T01:01:01.010101Z", 10),
        (102, "2015-01-01", "2015-01-01T01:01:01.010101Z", 10),
        (103, "2015-01-01", "2015-01-01T01:01:01.010101Z", 10),
        (104, "2015-01-01", "2015-01-01T01:01:01.010101Z", 10),
        (105, "2015-01-01", "2015-01-01T01:01:01.010101Z", 10),
    ],
    ("acctid", "date", "ts", "deposit"),
)

hudi 选项是:

hudi_options=
{'hoodie.table.name': 'compaction',
 'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
 'hoodie.datasource.write.operation': 'upsert',
 'hoodie.datasource.write.recordkey.field': 'acctid',
 'hoodie.datasource.write.partitionpath.field': 'date',
 'hoodie.datasource.write.precombine.field': 'ts',
 'hoodie.datasource.write.hive_style_partitioning': 'true',
 'hoodie.upsert.shuffle.parallelism': 2,
 'hoodie.insert.shuffle.parallelism': 2,
 'hoodie.delete.shuffle.parallelism': 2}

之后,我通过传递 3 个不同的 ts 值和存款来为密钥 100 运行更新,以便在同一个密钥上完成 3 次提交

# UPDATE deposit to **11** for key 100
update_df = spark.createDataFrame(
    [(100, "2015-01-01", "2015-01-01T11:01:01.000000Z", 11)],("acctid", "date", "ts", "deposit"))
update_df.write.format("org.apache.hudi").options(**hudi_options).mode("append").save(hudi_dataset)
# UPDATE deposit to **12** for key 100
update_df = spark.createDataFrame(
    [(100, "2015-01-01", "2015-01-01T12:01:01.000000Z", 12)],("acctid", "date", "ts", "deposit"))
update_df.write.format("org.apache.hudi").options(**hudi_options).mode("append").save(hudi_dataset)
# UPDATE deposit to **13** for key 100
update_df = spark.createDataFrame(
    [(100, "2015-01-01", "2015-01-01T13:01:01.000000Z", 13)],("acctid", "date", "ts", "deposit"))
update_df.write.format("org.apache.hudi").options(**hudi_options).mode("append").save(hudi_dataset)
first_commit = '20210719234312' # As per this particular run

output_df = (spark.read
             .option("hoodie.datasource.query.type", "incremental")
             .option("hoodie.datasource.read.begin.instanttime", first_commit)
             .format("org.apache.hudi")
             .load(hudi_dataset+"/*/*"))

output_df.show()

在此输出中,我看到 deposit = 11,有没有办法在不使用压缩的情况下以增量模式获得 deposit = 13?

4

0 回答 0