我有一个工作流程,我将描述如下:
[ Dump(query) ] ---+
|
+---> [ Parquet(dump, schema) ] ---> [ Hive(parquet) ]
|
[ Schema(query) ] ---+
在哪里:
query
是对 RDBMS 的查询Dump
将结果转储query
到 CSV 文件dump
Schema
运行query
和xcoms其架构schema
Parquet
读取csv
并用于schema
创建 Parquet 文件parquet
Hive
基于 Parquet 文件创建 Hive 表parquet
这种令人费解的工作流程背后的原因是由于无法解决并且超出问题范围的限制(但是,理想情况下它会比这简单得多)。
我的问题是关于在失败的情况下回滚管道的影响。
这些是我希望在不同条件下发生的回滚:
dump
无论管道的最终结果如何,都应始终删除parquet
如果出于某种原因,Hive 表创建失败,则应删除
在工作流程中表示这一点,我可能会这样写:
[ Dump(query) ] ---+
|
+---> [ Parquet(dump, schema) ] ---> [ Hive(parquet) ]
| | |
[ Schema(query) ] ---+ | |
v v
[ DeleteParquetOutput ] --> [ DeleteDumpOutput ]
仅当发生错误并且进入的转换忽略其依赖项的任何故障时才执行从Parquet
到的转换。DeleteParquetOutput
DeleteDumpOutput
这应该可以解决它,但我相信更复杂的管道可能会因为这种错误处理逻辑而大大增加复杂性。
在继续讨论更多细节之前,我的问题是:在处理 Airflow 管道中的错误时,这是否被认为是一种好的做法?有什么不同的(可能更可持续的)方法?
如果您对我想如何解决这个问题进一步感兴趣,请继续阅读,否则请随时回答和/或发表评论。
我对管道中的错误处理的看法
理想情况下,我想做的是:
- 为每个相关的阶段定义一个回滚过程
- 对于每个回滚过程,定义它是否应该仅在失败的情况下发生或在任何情况下发生
- 当管道完成时,反转依赖关系,并从最后一个成功的任务开始,遍历反转的 DAG 并运行相关的回滚过程(如果适用)
- 应记录回滚过程中的错误,但不考虑完成整个管道的回滚
- 为了保持前一点,每个任务都应该定义一个单独的效果,其回滚过程可以在不参考其他任务的情况下描述
让我们用给定的管道做几个例子。
场景一:成功
我们反转 DAG 并为其强制回滚过程(如果有)填充每个任务,得到这个
+---> [ Dump: UNDO ]
|
[ Hive: None ] ---> [ Parquet: None ] ---+
^ |
| +---> [ Schema: None ]
+--- Start here
场景 2:故障发生在Hive
+---> [ Dump: UNDO ]
|
[ Hive: None ] ---> [ Parquet: UNDO (error) ] ---+
^ |
| +---> [ Schema: None ]
+--- Start here
有什么方法可以在 Airflow 中表示这样的东西吗?我也愿意评估不同的工作流自动化解决方案,如果它们启用这种方法的话。