16

我想尝试使用 Airflow 而不是 Cron。但是 schedule_interval 并没有像我预期的那样工作。

我写了如下的python代码。
据我了解,Airflow 应该在“2016/03/30 8:15:00”上运行,但当时它没有工作。

如果我像这样更改它“'schedule_interval': timedelta(minutes = 5)”,我认为它可以正常工作。

“notice_slack.sh”只是将 slack api 调用到我的频道。

# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 3, 29, 8, 15),
}

dag = DAG(
    dag_id='notice_slack',
    default_args=args,
    schedule_interval="@daily",
    dagrun_timeout=timedelta(minutes=1))

# cmd file name
CMD = '/tmp/notice_slack.sh'

run_this = BashOperator(
    task_id='run_transport', bash_command=CMD, dag=dag)

我想每天在特定时间运行我的一些脚本,就像这个 cron 设置一样。

15 08 * * * bash /tmp/notice_slack.sh

我已阅读文档Scheduling & Triggers,我知道它与 cron 有点不同。
所以我尝试安排在“start_date”和“schedule_interval”设置。

有谁知道我该怎么办?

气流版本

信息 - 使用执行器 LocalExecutor

v1.7.0

amazon-linux-ami/2015.09-release-notes

4

5 回答 5

17

尝试这个:

# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals
import os
from airflow.operators import BashOperator
from airflow.models import DAG
from datetime import datetime, timedelta

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2016, 3, 29),
}

dag = DAG(
    dag_id='notice_slack',
    default_args=args,
    schedule_interval="15 08 * * *",
    dagrun_timeout=timedelta(minutes=1))

# cmd file name
CMD = 'bash /tmp/notice_slack.sh'

run_this = BashOperator(
    task_id='run_transport', bash_command=CMD, dag=dag)

start_date(datetime) – 任务的 start_date,确定第一个任务实例的 execution_date。最佳做法是将 start_date 四舍五入到 DAG 的 schedule_interval。

schedule_interval(datetime.timedelta 或 dateutil.relativedelta.relativedelta 或 str 充当 cron 表达式)- 定义 DAG 运行的频率,此 timedelta 对象被添加到最新任务实例的 execution_date 以确定下一个计划。

只需在 cron 设置中将schedule_intervaland配置为相同即可。bash_command

于 2016-04-07T02:15:05.757 回答
15

当 2016/03/30 8:15:00 + 计划间隔(每天)过去时,Airflow 将启动您的 DAG。所以你的 DAG 将在 2016/03/31 8:15:00 运行。

您可以查看气流常见问题解答

于 2016-06-28T01:26:36.157 回答
9

首先,您的开始日期应该是过去的 - 而不是'start_date': datetime(2016, 3, 29, 8, 15) 您会尝试'start_date': datetime(2016, 2, 29, 8, 15)

并应用 'catchup':False 以防止回填 - 除非这是您想做的事情。

来自 Airflow 文档 - Airflow 调度程序在 start_date + schedule_interval 传递后不久触发任务。

计划间隔可以作为 cron 提供 - 如果您想每天早上 8:15 运行它,表达式将是 - * '15 8 * * '

如果您只想在 10 月 31 日上午 8:15 运行它,则表达式为 - * '15 8 31 10 '

为了提供这个,'schedule_inteval':'15 8 * * *'在你的 Dag 属性中

您可以从https://crontab.guru/了解更多信息

或者,有气流预设 - 在此处输入图像描述

如果其中任何一个满足您的要求,那就很简单,'schedule_interval':'@hourly'

最后,您还可以将时间表应用为 python timedelta 对象,例如 12 PM

'schedule_interval': timedelta(hours=12)

于 2020-10-22T18:15:49.700 回答
6

如果您不确定如何创建气流 cron 表达式,可以尝试使用crontab.guru

于 2019-08-04T16:19:54.963 回答
1

使用您给出的示例@daily,它将在午夜过后运行您的工作。您可以尝试将其更改timedelta(days=1)为相对于您的固定值start_date(包括 08:15)。或者,您可以使用 cron 规范,schedule_interval='15 08 * * *'在这种情况下,您希望第一次运行的前一天 8:15 之前的任何开始日期都可以使用。

请注意,这depends_on_past: False已经是默认设置,您可能将其行为与catchup=falseDAG 参数中的行为混淆了,这将避免在开始日期和现在 DAG 计划间隔运行的时间之间进行过去运行。

于 2019-02-20T03:27:33.090 回答