我有一个 key = acctid 的 MOR 表,当我在同一个键上执行 3 次提交并尝试以增量模式读取时,我只看到第一次提交,是否有使用增量模式读取给定键的最后一次提交或所有提交?
请查看以下详细信息:
我在第一次运行时在 mor 表中插入了以下数据
input_df = spark.createDataFrame(
[
(100, "2015-01-01", "2015-01-01T01:01:01.010101Z", 10),
(101, "2015-01-01", "2015-01-01T01:01:01.010101Z", 10),
(102, "2015-01-01", "2015-01-01T01:01:01.010101Z", 10),
(103, "2015-01-01", "2015-01-01T01:01:01.010101Z", 10),
(104, "2015-01-01", "2015-01-01T01:01:01.010101Z", 10),
(105, "2015-01-01", "2015-01-01T01:01:01.010101Z", 10),
],
("acctid", "date", "ts", "deposit"),
)
hudi 选项是:
hudi_options=
{'hoodie.table.name': 'compaction',
'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.recordkey.field': 'acctid',
'hoodie.datasource.write.partitionpath.field': 'date',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.datasource.write.hive_style_partitioning': 'true',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2,
'hoodie.delete.shuffle.parallelism': 2}
之后,我通过传递 3 个不同的 ts 值和存款来为密钥 100 运行更新,以便在同一个密钥上完成 3 次提交
# UPDATE deposit to **11** for key 100
update_df = spark.createDataFrame(
[(100, "2015-01-01", "2015-01-01T11:01:01.000000Z", 11)],("acctid", "date", "ts", "deposit"))
update_df.write.format("org.apache.hudi").options(**hudi_options).mode("append").save(hudi_dataset)
# UPDATE deposit to **12** for key 100
update_df = spark.createDataFrame(
[(100, "2015-01-01", "2015-01-01T12:01:01.000000Z", 12)],("acctid", "date", "ts", "deposit"))
update_df.write.format("org.apache.hudi").options(**hudi_options).mode("append").save(hudi_dataset)
# UPDATE deposit to **13** for key 100
update_df = spark.createDataFrame(
[(100, "2015-01-01", "2015-01-01T13:01:01.000000Z", 13)],("acctid", "date", "ts", "deposit"))
update_df.write.format("org.apache.hudi").options(**hudi_options).mode("append").save(hudi_dataset)
first_commit = '20210719234312' # As per this particular run
output_df = (spark.read
.option("hoodie.datasource.query.type", "incremental")
.option("hoodie.datasource.read.begin.instanttime", first_commit)
.format("org.apache.hudi")
.load(hudi_dataset+"/*/*"))
output_df.show()
在此输出中,我看到 deposit = 11,有没有办法在不使用压缩的情况下以增量模式获得 deposit = 13?