3

我有一个工作流程,我将描述如下:

[  Dump(query)  ] ---+
                     |
                     +---> [ Parquet(dump, schema) ] ---> [ Hive(parquet) ]
                     |
[ Schema(query) ] ---+

在哪里:

  • query是对 RDBMS 的查询
  • Dump将结果转储query到 CSV 文件dump
  • Schema运行queryxcoms其架构schema
  • Parquet读取csv并用于schema创建 Parquet 文件parquet
  • Hive基于 Parquet 文件创建 Hive 表parquet

这种令人费解的工作流程背后的原因是由于无法解决并且超出问题范围的限制(但是,理想情况下它会比这简单得多)。

我的问题是关于在失败的情况下回滚管道的影响

这些是我希望在不同条件下发生的回滚:

  • dump无论管道的最终结果如何,都应始终删除
  • parquet如果出于某种原因,Hive 表创建失败,则应删除

在工作流程中表示这一点,我可能会这样写:

[  Dump(query)  ] ---+
                     |
                     +---> [ Parquet(dump, schema) ] ---> [ Hive(parquet) ]
                     |                |                          |
[ Schema(query) ] ---+                |                          |
                                      v                          v
                            [ DeleteParquetOutput ] --> [ DeleteDumpOutput ]

仅当发生错误并且进入的转换忽略其依赖项的任何故障时才执行从Parquet到的转换。DeleteParquetOutputDeleteDumpOutput

这应该可以解决它,但我相信更复杂的管道可能会因为这种错误处理逻辑而大大增加复杂性

在继续讨论更多细节之前,我的问题是:在处理 Airflow 管道中的错误时,这是​​否被认为是一种好的做法?有什么不同的(可能更可持续的)方法?

如果您对我想如何解决这个问题进一步感兴趣,请继续阅读,否则请随时回答和/或发表评论。


我对管道中的错误处理的看法

理想情况下,我想做的是:

  • 为每个相关的阶段定义一个回滚过程
  • 对于每个回滚过程,定义它是否应该仅在失败的情况下发生或在任何情况下发生
  • 当管道完成时,反转依赖关系,并从最后一个成功的任务开始,遍历反转的 DAG 并运行相关的回滚过程(如果适用)
  • 应记录回滚过程中的错误,但不考虑完成整个管道的回滚
  • 为了保持前一点,每个任务都应该定义一个单独的效果,其回滚过程可以在不参考其他任务的情况下描述

让我们用给定的管道做几个例子。

场景一:成功

我们反转 DAG 并为其强制回滚过程(如果有)填充每个任务,得到这个

                                         +---> [ Dump: UNDO ]
                                         |
[ Hive: None ] ---> [ Parquet: None ] ---+
^                                        |
|                                        +---> [ Schema: None ]
+--- Start here

场景 2:故障发生在Hive

                                                 +---> [ Dump: UNDO ]
                                                 |
[ Hive: None ] ---> [ Parquet: UNDO (error) ] ---+
                    ^                            |
                    |                            +---> [ Schema: None ]
                    +--- Start here

有什么方法可以在 Airflow 中表示这样的东西吗?我也愿意评估不同的工作流自动化解决方案,如果它们启用这种方法的话。

4

2 回答 2

2

所有操作符和传感器都派生自该类BaseOperator,支持回调:on_success_callbackon_retry_callback--on_failure_callback也许这些会有所帮助。

于 2018-03-30T14:19:07.043 回答
1

似乎是一种处理错误的复杂方法。我认为最好将错误视为简单地停止 DAG 的当前运行,以便您可以解决任何问题并从中断的地方重新启动它。当然,您可以清理由特定任务创建的部分创建的文件,但我不会因为一些下游问题而回退整个管道。

以我们在我工作的地方做的事情为例,诚然,它使用了不同的技术,但我认为使用的是相同的工作流程:

  1. 从源数据库中提取特定时间间隔的增量并将其压缩到 Airlfow 工作服务器
  2. 将此压缩文件移动到 S3 位置
  3. 将 S3 文件复制到 Snowflake 数据仓库中。

使用我们当前的设置 - 如果有人意外更改了我们加载 S3 文件的 Snowflake 表的结构,唯一会失败的任务是最后一个(步骤 3),因为表结构不再与 CSV 结构匹配。要解决此问题,我们只需将表的结构恢复到原来的状态并重新运行失败的任务。然后 Airflow 会将文件从 S3 重新复制到 Snowflake 并成功。

使用您建议的设置会发生什么?如果最后一个任务失败,它将回滚整个管道并从 s3 存储桶中删除 CSV 文件;我们将不得不再次从源数据库下载文件。如果我们简单地重新运行从 s3 复制到 Snowflake 的任务会更好,从而省去运行整个 DAG 的麻烦。

于 2018-03-13T15:57:26.733 回答