2

。嗨,大家好,

从 Airflow UI 中,我们试图了解如何在未来的特定时间启动 DAG 运行,但我们总是在追赶模式下获得 2 次额外的运行(即使追赶被禁用)

例子

使用以下参数创建 DAG 运行

  • 开始日期:10:30
  • execution_date:未定义
  • 间隔 = 3 分钟(来自 .py 文件)
  • catchup_by_default = False

在当前时间打开ON 开关:10:28。我们得到的是 Airflow 触发了 2 个 DAG 运行,execution_date 位于:

  • 10:24
  • 10:27

并且这 2 次 DAG 运行一个接一个地以追赶模式运行,这不是我们想要的 :-(

我们做错了什么?我们可能理解 10:27 的运行(ETL 概念),但我们没有得到 10:24 的运行 :-(

感谢您的帮助 :-)

细节:

操作系统:红帽 7

蟒蛇:2.7

气流:v1.8.0

DAG python 文件:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
     'owner': 'aa',
     'depends_on_past': False,
     'start_date': datetime(2017, 9, 7, 10, 30),
     'run_as_user': 'aa'
}

dag = DAG(
    'dag3', default_args=default_args, schedule_interval=timedelta(minutes=3))
dag.catchup = False

create_command = "/script.sh "

t1 = BashOperator(
    task_id='task',
    bash_command='date',
    dag=dag)
4

3 回答 3

3

我尝试在 SQLite 上使用 Airflow v.1.8.0、python v.3.5、db。以下 DAG 在 10:28 未暂停,与您的非常相似,并且可以正常工作(仅运行一次,在 10:33,持续到 10:30)。

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

def print_hello_3min():
    return ('Hello world! %s' % datetime.now())

dag = DAG('hello_world_3min', description='Simple tutorial DAG 3min',
          schedule_interval='*/3 * * * *',
          start_date=datetime(2017, 9, 18, 10, 30),
          catchup=False)

dummy_operator = DummyOperator(task_id='dummy_task_3min', retries=3, dag=dag)

hello_operator = PythonOperator(task_id='hello_task_3min',
                                python_callable=print_hello_3min, dag=dag)

dummy_operator >> hello_operator
于 2017-09-18T15:04:41.427 回答
2

这似乎仅在提供 atimedelta作为时间表时才会发生。将您的计划间隔切换为 cron 格式,它将不再运行两次。

于 2019-06-09T01:32:15.853 回答
2

StackEdit 编写

我不确定我的解决方案是否足够好,但我想表达我的理解。有两件事要一起考虑:

  1. schedule_interval 模式,如'hourly'、'daily'、'weekly'、'annually'。

    • 每小时 = (* 1 * * *) = “1 小时后的每一分钟。”</li>
    • 每日 = (0 1 * * *) = “01:00。”</li>
    • 每月 = (0 1 1 * *) = “在第 1 天的 01:00。”</li>
  2. 开始日期

    • 每小时 = 日期时间(2019, 4, 5, 1, 30)
    • 每天 = 日期时间(2019 年 4 月 5 日)
    • 每月 = 日期时间(2019, 4, 1)

我的策略是通过以间隔模式的 1 个单位减去预期的开始日期和时间来设置 [start_date] 。

例子:

  1. 2019-4-5 01:00开始第一份工作,时间间隔为每小时

    • schedule_interval 模式 =每小时
    • 预计开始日期时间 = 2019-4-5 01:00
    • 所以,start_date = 2019-4-5 00:00
    • 减一小时一小时
    • CRON = ( * 1 * * * ) 表示“1 小时后的每一分钟”。</li>
    default_args = {
         'owner': 'aa',
         'depends_on_past': False,
         'start_date': datetime(2019, 4, 5, 0, 0),
         'run_as_user': 'aa'
    }    
    dag = DAG(
        'dag3', default_args=default_args, catchup = False, schedule_interval='* 1 * * *')
  1. 2019-4-5 01:00开始第一份工作,间隔为每天

    • schedule_interval 模式 =每天
    • 预计开始日期时间日期= 2019-4-5 01:00
    • 所以,开始日期= 2019-4-4
    • 减天减1天
    • CRON = ( 0 1 * * * ) 表示“在 01:00”。</li>
    default_args = {
        'owner': 'aa',
        'depends_on_past': False,
        'start_date': datetime(2019, 4, 4),
        'run_as_user': 'aa'
    }

    dag = DAG(
        'dag3', default_args=default_args, catchup = False, schedule_interval='0 1 * * *')
  1. 2019-4-5 01:00开始第一份工作,间隔为每月

    • schedule_interval 模式 =每月
    • 预计开始日期时间日期 = 2019- 4-5 01:00
    • 所以,开始日期 = 2019- 4-4
    • 减天减1天
    • CRON = ( 0 1 1 * * ) 表示“在第 1 天的 01:00”。</li>
    default_args = {
         'owner': 'aa',
         'depends_on_past': False,
         'start_date': datetime(2019, 4, 4),
         'run_as_user': 'aa'
    }

    dag = DAG(
        'dag3', default_args=default_args, catchup = False, schedule_interval='0 1 1 * *')

到目前为止,该策略对我有用,但如果有人变得更好,请分享。

PS。我正在使用 [ https://crontab.guru]来生成完美的 cron 时间表。

于 2019-04-05T09:06:25.147 回答