0

我们最近尝试采用 Airflow 作为我们的“数据工作流”引擎,虽然我已经弄清楚了大部分事情,但我仍然处于关于调度程序如何计算何时触发 DAG 的灰色地带。

看看这个简单的 dag:

from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator

dag_options = {                
            'owner':                'Airflow',  
            'depends_on_past':      False,      
            'start_date':           datetime.now()
}

with DAG('test_dag1', schedule_interval="5 * * * *", default_args=dag_options) as dag:
                task1 = BashOperator(      
                task_id='task1', 
                bash_command='date',                
                dag=dag)      

时间表将选择它,但不会执行它。现在,如果我将“start_date”更改为:

datetime(year=xxxx,month=yyyy=day=zzzz) 

xxxx,yyyy,zzzz 是今天的日期,它将开始执行。造成这种情况的原因是调度程序不断从源 dag 文件夹中重新读取此 dag,每次执行 datetime.now(),注意到开始日期与当前排队的时间不同,重新添加此 dag 并因此重新调度/将执行日期向前推(我的dag_dir_list_interval是 300)?

此外,据我了解,在气流中,当 dag 未暂停(或添加 dags_are_paused_at_creation = False)时,调度程序将按如下方式安排执行:

  • 第一次 dag 执行:(start_date + interval) 之后的瞬间
  • 第二次 dag 执行:(start_date + (interval * 2)) 之后的瞬间
  • 第三次 dag 执行:(start_date + (interval * 3)) 之后的瞬间

这是正确的假设吗?

更新(2017 年 7 月 30 日)

基于上述假设,我今天(2017 年 7 月 30 日)创建了这个 dag:

from airflow import DAG
from datetime import datetime
from airflow.operators.bash_operator import BashOperator

dag_options = {                
            'owner':             'Airflow',  
            'depends_on_past':   False,      
            'start_date':   
datetime(year=2017,month=7,day=30,hour=20,minute=10)
}

with DAG('test_dag_100', schedule_interval="*/10 * * * *", 
default_args=dag_options) as dag:
                task1 = BashOperator(      
                task_id='task_100', 
                bash_command='date',                
                dag=dag)      

应该从(UTC)开始:

  • 2017 年 7 月 30 日 20:20:00
  • 2017 年 7 月 30 日 20:30:00
  • 2017 年 7 月 30 日 20:40:00

不幸的是,这并没有发生。以下是我的仪表板的一些屏幕截图:

有人可以解释为什么 20:21:00 dag 没有执行吗?20:31:00 之后它仍然没有执行......我在这里错过了什么?

顺便说一句,我还注意到,出于某种原因,每次我通过仪表板手动启动 dag 时,它只是处于“运行”阶段。为什么是这样?手动启动它是否与任何开始时间选项(start_date/interval/etc)有关?

感谢您提供的任何澄清

4

1 回答 1

2

你的假设是正确的。Airflow 将在指定的计划间隔从开始日期过去后安排第一次 DAG 运行。使用 datetime.now() 作为开始日期将导致 Airflow 很少(如果有的话)触发 DAG。它在调度文档中提到。

如果您要指定一个特定的开始日期,例如 datetime(2017,7,27,1,0),计划间隔为“5 * * * *”,那么在 7 月 27 日凌晨 1:05,DAG 将是第一次触发运行。之后它将继续每五分钟运行一次。

于 2017-07-27T15:43:16.353 回答