14

代码:

Python 版本 2.7.x 和气流版本 1.5.1

我的 dag 脚本是这样的

from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta


default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['xyz@email.in'],
'schedule_interval':timedelta(minutes=5),
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing', default_args=default_args)
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
    t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
    t.set_upstream(run_this_first)

从那里你可以看到我正在创建一个包含 6 个任务的 DAG,第一个任务(Start1)首先启动,然后所有其他五个任务启动

目前我在 DAG 开始之间延迟了 5 分钟

对于第一种类型的所有六个任务,它已经完美运行,但五分钟后 DAG 没有重新启动

已经超过 1 小时 DAG 没有重新启动我真的不知道我错了。

如果有人能指出我出了什么问题,那就太好了。我尝试使用airflow testing clearthen 清除以发生同样的事情。它首先运行然后就站在那里。

命令行显示的唯一内容是Getting all instance for DAG testing

当我更改 schedule_interval 的位置时,它只会在没有任何调度间隔的情况下并行运行。也就是说,在 5 分钟内完成了 300 个或更多任务实例。没有 5 分钟的计划间隔

代码 2:

from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta


default_args = {
'owner': 'xyz',
'depends_on_past': False,
'start_date': datetime(2015,10,13),
'email': ['xyz@email.in'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here
run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag)
for i in range(5):
    t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag)
    t.set_upstream(run_this_first)
4

2 回答 2

11

对于代码 2,我猜它每分钟运行的原因是:

  1. 开始时间为2015-10-13 00:00

  2. 计划间隔为 5 分钟

  3. 调度器的每一次心跳(默认为 5 秒),都会检查你的 DAG

    • 第一次检查:开始日期(未找到最后执行日期)+调度程序间隔<当前时间?如果是,将执行 DAG 并记录最后执行时间。(例如 2015-10-13 00:00 + 5min < 当前?)
    • 第二次检查下一个心跳:上次执行时间+调度程序间隔<当前时间?如果是这样,将再次执行 DAG。
    • ……

解决方案是将 DAG start_date 设置为datetime.now() - schedule_interval.

如果你想调试:

  1. debug在 settings.py中将 LOGGINGLEVEL 设置为

  2. 修改tois_queueable()的类方法airflow.models.TaskInstance

def is_queueable(self, flag_upstream_failed=False):
    logging.debug('Checking whether task instance is queueable or not!')
    if self.execution_date > datetime.now() - self.task.schedule_interval:
        logging.debug('Too early to execute: execution_date {0} + task.schedule_interval {1} > datetime.now() {2}'.format(self.execution_date, self.task.schedule_interval, datetime.now()))
        return False
        ...
于 2015-10-15T13:59:04.567 回答
4

因为开始时间(2015-10-13 00:00)比现在时间短,所以触发了气流回填。它将从 2015 年 10 月 13 日 00:00 开始运行,此时气流调度程序检测到的每一秒(其开始日期),但执行日期在 5 分钟(任务间隔时间)之间。

查看日志名称:

$tree airflow/logs/testing/
testing/
|-- Orders10
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
|-- Orders11
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
|-- Orders12
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
|-- Orders13
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
|-- Orders14
|   |-- 2015-10-13T00:00:00
|   |-- 2015-10-13T00:05:00
|   -- 2015-10-13T00:10:00
-- Start1
    |-- 2015-10-13T00:00:00
    |-- 2015-10-13T00:05:00
    |-- 2015-10-13T00:10:00
    -- 2015-10-13T00:15:00

查看日志的创建时间:

$ll airflow/logs/testing/Start1
-rw-rw-r-- 1 admin admin 4192 Nov  9 14:50 2015-10-13T00:00:00
-rw-rw-r-- 1 admin admin 4192 Nov  9 14:50 2015-10-13T00:05:00
-rw-rw-r-- 1 admin admin 4192 Nov  9 14:51 2015-10-13T00:10:00
-rw-rw-r-- 1 admin admin 4192 Nov  9 14:52 2015-10-13T00:15:00

此外,您可以在 Web UI 上查看任务实例:

气流任务实例

于 2015-11-09T07:08:40.117 回答