3

我需要有可以手动或以编程方式执行的管道,可以使用 Airflow 吗?看起来现在每个工作流程都必须与时间表相关联。

4

3 回答 3

4

只需在创建 DAG 时设置schedule_interval为:None

dag = DAG('workflow_name',
          template_searchpath='path',
          schedule_interval=None,
          default_args=default_args)

气流手册

每个 DAG 可能有也可能没有时间表,该时间表告知如何创建 DAG 运行。schedule_interval 被定义为 DAG 参数,并且最好接收 cron 表达式作为 str 或 datetime.timedelta 对象。

手册接着列出了一些 cron 的“预设”,其中之一是None.

于 2018-05-23T22:21:09.880 回答
2

在 Airflow 中,每个DAG都需要有一个开始日期和计划间隔*,例如每小时:

import datetime

dag = DAG(
    dag_id='my_dag',
    schedule_interval=datetime.timedelta(hours=1),
    start_date=datetime(2018, 5, 23),
)

(如果没有时间表,它怎么知道什么时候运行?)

作为 cron 计划的替代方案,您可以将计划设置@once为仅运行一次。

*一个例外:您可以省略外部触发的 DAG的计划,因为 Airflow 不会自行安排它们。

但是,也就是说,如果您省略了时间表,那么您需要以某种方式从外部触发 DAG。如果您希望能够以编程方式调用 DAG,例如,由于另一个 DAG 中发生了单独的条件,您可以使用TriggerDagRunOperator来实现。您可能还听说过这种称为外部触发 DAG 的想法。

这是 Airflow Example DAG 中的一个使用示例:

文件 1 - example_trigger_controller_dag.py

"""This example illustrates the use of the TriggerDagRunOperator. There are 2
entities at work in this scenario:
1. The Controller DAG - the DAG that conditionally executes the trigger
2. The Target DAG - DAG being triggered (in example_trigger_target_dag.py)

This example illustrates the following features :
1. A TriggerDagRunOperator that takes:
  a. A python callable that decides whether or not to trigger the Target DAG
  b. An optional params dict passed to the python callable to help in
     evaluating whether or not to trigger the Target DAG
  c. The id (name) of the Target DAG
  d. The python callable can add contextual info to the DagRun created by
     way of adding a Pickleable payload (e.g. dictionary of primitives). This
     state is then made available to the TargetDag
2. A Target DAG : c.f. example_trigger_target_dag.py
"""

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from datetime import datetime

import pprint

pp = pprint.PrettyPrinter(indent=4)


def conditionally_trigger(context, dag_run_obj):
    """This function decides whether or not to Trigger the remote DAG"""
    c_p = context['params']['condition_param']
    print("Controller DAG : conditionally_trigger = {}".format(c_p))
    if context['params']['condition_param']:
        dag_run_obj.payload = {'message': context['params']['message']}
        pp.pprint(dag_run_obj.payload)
        return dag_run_obj


# Define the DAG
dag = DAG(dag_id='example_trigger_controller_dag',
          default_args={"owner": "airflow",
                        "start_date": datetime.utcnow()},
          schedule_interval='@once')


# Define the single task in this controller example DAG
trigger = TriggerDagRunOperator(task_id='test_trigger_dagrun',
                                trigger_dag_id="example_trigger_target_dag",
                                python_callable=conditionally_trigger,
                                params={'condition_param': True,
                                        'message': 'Hello World'},
                                dag=dag)

文件 2 - example_trigger_target_dag.py

from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime

import pprint
pp = pprint.PrettyPrinter(indent=4)

# This example illustrates the use of the TriggerDagRunOperator. There are 2
# entities at work in this scenario:
# 1. The Controller DAG - the DAG that conditionally executes the trigger
#    (in example_trigger_controller.py)
# 2. The Target DAG - DAG being triggered
#
# This example illustrates the following features :
# 1. A TriggerDagRunOperator that takes:
#   a. A python callable that decides whether or not to trigger the Target DAG
#   b. An optional params dict passed to the python callable to help in
#      evaluating whether or not to trigger the Target DAG
#   c. The id (name) of the Target DAG
#   d. The python callable can add contextual info to the DagRun created by
#      way of adding a Pickleable payload (e.g. dictionary of primitives). This
#      state is then made available to the TargetDag
# 2. A Target DAG : c.f. example_trigger_target_dag.py

args = {
    'start_date': datetime.utcnow(),
    'owner': 'airflow',
}

dag = DAG(
    dag_id='example_trigger_target_dag',
    default_args=args,
    schedule_interval=None)


def run_this_func(ds, **kwargs):
    print("Remotely received value of {} for key=message".
          format(kwargs['dag_run'].conf['message']))


run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag)


# You can also access the DagRun object in templates
bash_task = BashOperator(
    task_id="bash_task",
    bash_command='echo "Here is the message: '
                 '{{ dag_run.conf["message"] if dag_run else "" }}" ',
    dag=dag)
于 2018-05-23T22:28:51.303 回答
2

是的,这可以通过传递Noneschedule_intervalin来实现default_args

在 DAG 运行时检查此文档。

例如:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 12, 1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'schedule_interval': None, # Check this line 
}
于 2018-05-23T22:29:01.977 回答