我有一个用例,我需要根据 9 个不同的排名条件将三角湖中的青铜表合并到三角湖中的银表。条件根据业务优先级进行排序。当记录没有合并到最高优先级规则中时,它应该沿着列表向下并尝试在下一个规则中合并,直到尝试所有 9 条规则。任何与这 9 条规则中的标准不匹配的内容都应分配内部 id 并添加到银表中
例如:
- 如果青铜表中的记录有 my_company_id 将其与 my_company_id 上的白银表合并
- 如果青铜表中的记录没有在规则一中合并,请尝试在 some_external_id、first_name、last_name 上合并它
- 如果青铜表中的记录没有在规则一和规则二中合并,则尝试在 ssn、first_name、last_name、zip_code .... 上合并它,直到规则 9
任何不匹配的我都应该分配它 my_company_id
基本上,银表应该有最新的记录,并且每条记录都应该有内部的 my_company_id,无论它已经存在还是新记录。
我正在使用火花流和 foreachBatch 方法,但我不确定通过所有规则的最佳方法是什么。其中一种方法可能是根据我需要用于不同规则的字段过滤不同组中的微批次
def upsertToDelta(microBatchOutputDF, batchId):
rule1_candidates = microBatchOutputDF.filter("my_identifier != ''")
person_silver_df.alias("old").merge(rule1_candidates.alias("new"), \
"old.my_identifier = new.my_identifier")\
.whenMatchedUpdateAll()\
.execute()
rule2_candidates = microBatchOutputDF.filter("some_external_id != '' AND first_name != '' AND last_name != ''")
person_silver_df.alias("old").merge(rule2_candidates.alias("new"),\
"old.some_external_id = new.some_external_id AND \
old.first_name = new.first_name AND \
old.last_name = new.last_name")\
.whenMatchedUpdate(set = {
"field_a": "new.field_a",
"field_b": "new.field_b"
})\
.execute()
#.......and so on
person_query = person_bronze_df.writeStream\
.format("delta") \
.foreachBatch(upsertToDelta) \
.outputMode("update") \
.start()
person_query.awaitTermination()
但问题是如果 rule1_candidates 出于某种原因与 my_identifier 不匹配,我仍然必须将其传递给优先级较低的规则,因此我必须在合并后实现逻辑以找出未合并的内容。只是把所有的东西都缝合在了复杂的地方,这似乎不是一个正确的方法。有没有更好的方法来做到这一点?