7

我一直在评估气流。我有一个用例,我有一个每小时运行一次的工作流来获取每小时的数据聚合。另一个每天运行以获得相同的每日聚合。是否可以创建一个组合工作流,其中仅当所有每小时聚合在过去一天都成功时才会运行每日聚合?我已经看到您可以创建子 dag,但是这两个 dag 可以以不同的频率运行吗?如果是怎么办?

4

2 回答 2

4

不确定您希望它如何工作,但虽然没有一种直接的方法可以做到这一点,但有几种方法可以使用广泛的气流操作员套件来构建这样的 dag。

例如,您可以制作每小时depend_on_pastdag,然后使用 python 分支运算符使当天聚合任务/dag 在当天最后一次运行的每小时 dag 结束时运行/触发。查看PythonBranchOperatorTriggerDagRunOperator

您还可以为每日聚合器创建自己的传感器,以确保当天的所有每小时 dag 都已成功。查看以ExternalTaskSensor供参考。

于 2016-08-15T22:16:40.487 回答
0

它可能很难看,但是使用 PythonOperator 有一种非常直接的方式可以“在幕后”进行操作:

dag = DAG('hourly_daily_update_v0',
          schedule_interval='@hourly')

hourly_update = PythonOperator(task_id='update_hourly_v0',
                               python_callable=update_hourly,
                               provide_context=True,
                               dag=dag)

daily_update = PythonOperator(task_id='update_daily_v0',
                               python_callable=update_daily,
                               provide_context=True,
                               dag=dag)

因此,您每小时和每天都调用 Airflow 方式。但是,在 update_daily() 调用中,您可以检查小时:

def update_daily(**context):
    if context['execution_date'].hour == 0: # hour 0
        # Do all the things!
    else:
        # Do none of the things!

Airflow 将每天成功运行 update_daily() 24 次,但实际上它只会在 0 小时执行一次工作。您可以随意扩展它。唯一的问题是 Airflow 假设模式之外的微小步骤,这将在第 1 小时和第 24 小时之间造成一些虚假信息。

于 2019-11-18T17:50:12.933 回答