29

有没有办法在 Airflow 中创建一个用户定义的宏,它本身是从其他宏计算的?

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

dag = DAG(
    'simple',
    schedule_interval='0 21 * * *',
    user_defined_macros={
        'next_execution_date': '{{ dag.following_schedule(execution_date) }}',
    },
)

task = BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ next_execution_date }}"',
    dag=dag,
)

next_execution_date这里的用例是将新的 Airflow v1.8宏反向移植到 Airflow v1.7 中。不幸的是,这个模板是在没有宏扩展的情况下呈现的:

$ airflow render simple bash_op 2017-08-09 21:00:00
    # ----------------------------------------------------------
    # property: bash_command
    # ----------------------------------------------------------
    echo "{{ dag.following_schedule(execution_date) }}"
4

4 回答 4

42

以下是一些解决方案:

1. 覆盖BashOperator以向上下文添加一些值

class NextExecutionDateAwareBashOperator(BashOperator):
    def render_template(self, attr, content, context):
        dag = context['dag']
        execution_date = context['execution_date']
        context['next_execution_date'] = dag.following_schedule(execution_date)

        return super().render_templates(attr, content, context)
        # or in python 2:
        # return super(NextExecutionDateAwareBashOperator, self).render_templates(attr, content, context)

这种方法的好处是:您可以在自定义运算符中捕获一些重复的代码。

不好的部分:在渲染模板字段之前,您必须编写一个自定义运算符来向上下文添加值。

2. 在用户定义的宏中进行计算

不一定是值。它们可以是函数。

在你的日子里:

def compute_next_execution_date(dag, execution_date):
    return dag.following_schedule(execution_date)

dag = DAG(
    'simple',
    schedule_interval='0 21 * * *',
    user_defined_macros={
        'next_execution_date': compute_next_execution_date,
    },
)

task = BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ next_execution_date(dag, execution_date) }}"',
    dag=dag,
)

好的部分:您可以定义可重用函数来处理运行时可用的值(XCom 值、作业实例属性、任务实例属性等),并使您的函数结果可用于呈现模板。

不好的部分(但不是那么烦人):您必须在需要的每个 dag 中导入这样的函数作为用户定义的宏。

3. 直接在你的模板中调用你的语句

这个解决方案是最简单的(正如Ardan 的回答所提到的),在你的情况下可能是一个好的解决方案。

BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ dag.following_schedule(execution_date) }}"',
    dag=dag,
)

非常适合像这样的简单呼叫。它们是其他一些可直接用作的对象(如task, task_instance, 等...);甚至可以使用一些标准模块(例如macros.time,...)。

于 2017-11-29T14:01:46.263 回答
8

我会投票支持制作 Airflow 插件来注入您的预定义宏。使用此方法,您可以在任何运算符中使用预定义的宏,而无需声明任何内容。

下面是我们正在使用的一些自定义宏。使用示例: {{ macros.dagtz_next_execution_date(ti) }}

from airflow.plugins_manager import AirflowPlugin
from datetime import datetime, timedelta
from airflow.utils.db import provide_session
from airflow.models import DagRun
import pendulum


@provide_session
def _get_dag_run(ti, session=None):
    """Get DagRun obj of the TaskInstance ti

    Args:
        ti (TYPE): the TaskInstance object
        session (None, optional): Not in use

    Returns:
        DagRun obj: the DagRun obj of the TaskInstance ti
    """
    task = ti.task
    dag_run = None
    if hasattr(task, 'dag'):
        dag_run = (
            session.query(DagRun)
            .filter_by(
                dag_id=task.dag.dag_id,
                execution_date=ti.execution_date)
            .first()
        )
        session.expunge_all()
        session.commit()
    return dag_run


def ds_add_no_dash(ds, days):
    """
    Add or subtract days from a YYYYMMDD
    :param ds: anchor date in ``YYYYMMDD`` format to add to
    :type ds: str
    :param days: number of days to add to the ds, you can use negative values
    :type days: int
    >>> ds_add('20150101', 5)
    '20150106'
    >>> ds_add('20150106', -5)
    '20150101'
    """

    ds = datetime.strptime(ds, '%Y%m%d')
    if days:
        ds = ds + timedelta(days)
    return ds.isoformat()[:10].replace('-', '')


def dagtz_execution_date(ti):
    """get the TaskInstance execution date (in DAG timezone) in pendulum obj

    Args:
        ti (TaskInstance): the TaskInstance object

    Returns:
        pendulum obj: execution_date in pendulum object (in DAG tz)
    """
    execution_date_pdl = pendulum.instance(ti.execution_date)
    dagtz_execution_date_pdl = execution_date_pdl.in_timezone(ti.task.dag.timezone)
    return dagtz_execution_date_pdl


def dagtz_next_execution_date(ti):
    """get the TaskInstance next execution date (in DAG timezone) in pendulum obj

    Args:
        ti (TaskInstance): the TaskInstance object

    Returns:
        pendulum obj: next execution_date in pendulum object (in DAG tz)
    """

    # For manually triggered dagruns that aren't run on a schedule, next/previous
    # schedule dates don't make sense, and should be set to execution date for
    # consistency with how execution_date is set for manually triggered tasks, i.e.
    # triggered_date == execution_date.
    dag_run = _get_dag_run(ti)
    if dag_run and dag_run.external_trigger:
        next_execution_date = ti.execution_date
    else:
        next_execution_date = ti.task.dag.following_schedule(ti.execution_date)

    next_execution_date_pdl = pendulum.instance(next_execution_date)
    dagtz_next_execution_date_pdl = next_execution_date_pdl.in_timezone(ti.task.dag.timezone)
    return dagtz_next_execution_date_pdl


def dagtz_next_ds(ti):
    """get the TaskInstance next execution date (in DAG timezone) in YYYY-MM-DD string
    """
    dagtz_next_execution_date_pdl = dagtz_next_execution_date(ti)
    return dagtz_next_execution_date_pdl.strftime('%Y-%m-%d')


def dagtz_next_ds_nodash(ti):
    """get the TaskInstance next execution date (in DAG timezone) in YYYYMMDD string
    """
    dagtz_next_ds_str = dagtz_next_ds(ti)
    return dagtz_next_ds_str.replace('-', '')


def dagtz_prev_execution_date(ti):
    """get the TaskInstance previous execution date (in DAG timezone) in pendulum obj

    Args:
        ti (TaskInstance): the TaskInstance object

    Returns:
        pendulum obj: previous execution_date in pendulum object (in DAG tz)
    """

    # For manually triggered dagruns that aren't run on a schedule, next/previous
    # schedule dates don't make sense, and should be set to execution date for
    # consistency with how execution_date is set for manually triggered tasks, i.e.
    # triggered_date == execution_date.
    dag_run = _get_dag_run(ti)
    if dag_run and dag_run.external_trigger:
        prev_execution_date = ti.execution_date
    else:
        prev_execution_date = ti.task.dag.previous_schedule(ti.execution_date)

    prev_execution_date_pdl = pendulum.instance(prev_execution_date)
    dagtz_prev_execution_date_pdl = prev_execution_date_pdl.in_timezone(ti.task.dag.timezone)
    return dagtz_prev_execution_date_pdl


def dagtz_prev_ds(ti):
    """get the TaskInstance prev execution date (in DAG timezone) in YYYY-MM-DD string
    """
    dagtz_prev_execution_date_pdl = dagtz_prev_execution_date(ti)
    return dagtz_prev_execution_date_pdl.strftime('%Y-%m-%d')


def dagtz_prev_ds_nodash(ti):
    """get the TaskInstance prev execution date (in DAG timezone) in YYYYMMDD string
    """
    dagtz_prev_ds_str = dagtz_prev_ds(ti)
    return dagtz_prev_ds_str.replace('-', '')


# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
    name = "custom_macros"
    macros = [dagtz_execution_date, ds_add_no_dash,
              dagtz_next_execution_date, dagtz_next_ds, dagtz_next_ds_nodash,
              dagtz_prev_execution_date, dagtz_prev_ds, dagtz_prev_ds_nodash]
于 2019-02-28T09:52:56.207 回答
4

user_defined_macros默认情况下不作为模板处理。如果您想将模板保留在 a 中user_defined_macro(或者如果您在变量中使用模板params),您始终可以手动重新运行模板函数:

class DoubleTemplatedBashOperator(BashOperator):
    def pre_execute(self, context):
        context['ti'].render_templates()

这适用于不引用其他参数或 UDM 的模板。这样,您可以拥有“两深”模板。

或者将您的 UDM 直接放在BashOperator's 命令中(最简单的解决方案):

BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ dag.following_schedule(execution_date) }}"',
    dag=dag,
)
于 2017-08-29T14:21:06.333 回答
0

这些都不适合我,所以这就是我所做的,我使用了user_defined_macros但我将所有模板变量传递给我的宏,然后我使用 jinja 来渲染结果

MACRO_CONFIG = 'config({"data_interval_start": data_interval_start, "data_interval_end": data_interval_end, "ds": ds, "ds_nodash": ds_nodash, "ts": ts, "ts_nodash_with_tz": ts_nodash_with_tz, "ts_nodash": ts_nodash, "prev_data_interval_start_success": prev_data_interval_start_success, "prev_data_interval_end_success": prev_data_interval_end_success, "dag": dag, "task": task, "macros": macros, "task_instance": task_instance, "ti": ti, "params": params, "conn": conn, "task_instance_key_str": task_instance_key_str, "conf": conf, "run_id": run_id, "dag_run": dag_run, "test_mode": test_mode})'

def config_macro(context):
    return FunctionThatReturnsTemplates(context)

with DAG(
        'my-dag-id',
        schedule_interval=None,
        start_date=days_ago(1),
        user_defined_macros={'config': config_macro}
) as dag:
...

def config_macro_template(attr_name):
    return '{{' + MACRO_CONFIG + '.' + attr_name + '}}'

class FunctionThatReturnsTemplates(object):
    def __getattribute__(self, name):
        attr = object.__getattribute__(self, name)

        logging.info('attr')
        logging.info(attr)
        logging.info("type(attr)")
        logging.info(type(attr))

        if callable(attr):
            logging.info('method attr')

            def render_result(*args, **kwargs):
                logging.info('before calling %s' % attr.__name__)
                result = attr(*args, **kwargs)
                logging.info('done calling %s' % attr.__name__)

                return Template(result).render(**self.context) if isinstance(result, str) or isinstance(result, unicode) else result

            return render_result

        logging.info('attr is not method')
        if isinstance(attr, str) or isinstance(attr, unicode):
            logging.info('attr is string or unicode')
            result = Template(attr).render(**self.context)
            logging.info(result)
            logging.info("result")
            return result

        return attr

    def __init__(self, context):
        logging.info('from sampling pipeline context')
        logging.info(context)
        self.context = context
...

    my_task = SomeOperator(
        templated_field=config_macro_template('function(args)'),
        task_id='my-task-id'
    )
于 2021-11-10T03:09:23.373 回答