代码:
Python 版本 2.7.x 和气流版本 1.5.1
我的 dag 脚本是这样的
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['xyz@email.in'],
'schedule_interval':timedelta(minutes=5),
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing', default_args=default_args)
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
t.set_upstream(run_this_first)
从那里你可以看到我正在创建一个包含 6 个任务的 DAG,第一个任务(Start1)首先启动,然后所有其他五个任务启动
目前我在 DAG 开始之间延迟了 5 分钟
对于第一种类型的所有六个任务,它已经完美运行,但五分钟后 DAG 没有重新启动
已经超过 1 小时 DAG 没有重新启动我真的不知道我错了。
如果有人能指出我出了什么问题,那就太好了。我尝试使用airflow testing clear
then 清除以发生同样的事情。它首先运行然后就站在那里。
命令行显示的唯一内容是Getting all instance for DAG testing
当我更改 schedule_interval 的位置时,它只会在没有任何调度间隔的情况下并行运行。也就是说,在 5 分钟内完成了 300 个或更多任务实例。没有 5 分钟的计划间隔
代码 2:
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['xyz@email.in'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
t.set_upstream(run_this_first)