我一直在评估气流。我有一个用例,我有一个每小时运行一次的工作流来获取每小时的数据聚合。另一个每天运行以获得相同的每日聚合。是否可以创建一个组合工作流,其中仅当所有每小时聚合在过去一天都成功时才会运行每日聚合?我已经看到您可以创建子 dag,但是这两个 dag 可以以不同的频率运行吗?如果是怎么办?
问问题
1555 次
2 回答
4
不确定您希望它如何工作,但虽然没有一种直接的方法可以做到这一点,但有几种方法可以使用广泛的气流操作员套件来构建这样的 dag。
例如,您可以制作每小时depend_on_past
dag,然后使用 python 分支运算符使当天聚合任务/dag 在当天最后一次运行的每小时 dag 结束时运行/触发。查看PythonBranchOperator
和TriggerDagRunOperator
。
您还可以为每日聚合器创建自己的传感器,以确保当天的所有每小时 dag 都已成功。查看以ExternalTaskSensor
供参考。
于 2016-08-15T22:16:40.487 回答
0
它可能很难看,但是使用 PythonOperator 有一种非常直接的方式可以“在幕后”进行操作:
dag = DAG('hourly_daily_update_v0',
schedule_interval='@hourly')
hourly_update = PythonOperator(task_id='update_hourly_v0',
python_callable=update_hourly,
provide_context=True,
dag=dag)
daily_update = PythonOperator(task_id='update_daily_v0',
python_callable=update_daily,
provide_context=True,
dag=dag)
因此,您每小时和每天都调用 Airflow 方式。但是,在 update_daily() 调用中,您可以检查小时:
def update_daily(**context):
if context['execution_date'].hour == 0: # hour 0
# Do all the things!
else:
# Do none of the things!
Airflow 将每天成功运行 update_daily() 24 次,但实际上它只会在 0 小时执行一次工作。您可以随意扩展它。唯一的问题是 Airflow 假设模式之外的微小步骤,这将在第 1 小时和第 24 小时之间造成一些虚假信息。
于 2019-11-18T17:50:12.933 回答