0

我计划从今天 2020/08/11 开始,在东部标准时间 (NY) 周二至周六凌晨 04:00 运行 DAG。在编写代码并部署之后,我预计 DAG 会被触发。我刷新了 Airflow UI 页面几次,但仍然没有触发。我正在使用带有 python 3 的 Airflow 版本 v1.10.9-composer。

这是我的 DAG 代码:

"""
This DAG executes a retrieval job
"""

# Required packages to execute DAG

from __future__ import print_function
import pendulum
from airflow.models import DAG
from airflow.models import Variable
from datetime import datetime, timedelta
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule


local_tz = pendulum.timezone("America/New_York")

# DAG parameters

default_args = {
    'owner': 'Me',
    'depends_on_past': False,
    'start_date': datetime(2020, 8, 10, 4, tzinfo=local_tz),
    'dagrun_timeout': None,
    'email': Variable.get('email'),
    'email_on_failure': True,
    'email_on_retry': False,
    'provide_context': True,
    'retries': None,
    'retry_delay': timedelta(minutes=5)
}

# create DAG object with Name and default_args
with DAG(
        'retrieve_files',
        schedule_interval='0 4 * * 2-6',
        description='Retrieves files from sftp',
        max_active_runs=1,
        catchup=True,
        default_args=default_args
) as dag:
    # Define tasks - below are dummy tasks and a task instantiated by SSHOperator- calling methods written in other py class
    start_dummy = DummyOperator(
        task_id='start',
        dag=dag
    )

    end_dummy = DummyOperator(
        task_id='end',
        trigger_rule=TriggerRule.NONE_FAILED,
        dag=dag
    )

    retrieve_file = SSHOperator(
        ssh_conn_id="my_conn",
        task_id='retrieve_file',
        command='/usr/bin/python3  /path_to_file/getFile.py',
        dag=dag)


    dag.doc_md = __doc__

    retrieve_file.doc_md = """\
    #### Task Documentation
    Connects to sftp and retrieves files.
    """

    start_dummy >> retrieve_file >> end_dummy
4

1 回答 1

1

参考官方文档

调度程序在开始日期一个 schedule_interval 运行您的作业。

如果您的 start_date 是 2020-01-01 并且 schedule_interval 是 @daily,那么第一次运行将在 2020-01-02 即在您的开始日期过去之后创建。

为了在每天的特定时间(包括今天)运行 DAG,start_date需要将其设置为过去的时间,并且schedule_interval需要具有所需时间的cron格式。正确设置昨天的日期时间非常重要,否则触发器将不起作用。

在这种情况下,我们应该将 设置start_date为前一周的星期二,即:(2020, 8, 4)。由于每周运行一次,因此自您的开始日期起应该有 1 周的间隔。

让我们看一下以下示例,该示例显示了如何在美国东部标准时间周二至周六凌晨 04:00 运行作业:

from datetime import datetime, timedelta
from airflow import models
import pendulum
from airflow.operators import bash_operator

local_tz = pendulum.timezone("America/New_York")

default_dag_args = {
    'start_date': datetime(2020, 8, 4, 4, tzinfo=local_tz),
    'retries': 0,
}

with models.DAG(
        'Test',
        default_args=default_dag_args,
        schedule_interval='00 04 * * 2-6') as dag:
       # DAG code
    print_dag_run_conf = bash_operator.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

我建议您查看start_date文档有什么问题。

于 2020-08-11T13:07:38.403 回答