7

我正在尝试为我的气流 jinja2 模板添加自定义过滤器。

因为我在 S3 中的文件夹就像

/年月日/

,我的目的是在变量屏幕中使用昨天_ds,如下所示:

s3://logs.web.com/AWSLogs/{{昨天_ds | get_year }}/{{昨天_ds | get_month }}/{{昨天_ds | get_day }}/

我在 PR(我认为它已经合并..)中看到,您可以在此处创建 dag 对象的 dag_args 参数中使用参数 'user_defined_filters' 来执行此操作

问题是,即使这样做,它也会显示“没有名为 get_year 的过滤器”。

这是我的代码:

dag.py

   dag = DAG(
        dag_id='dag-name',
        default_args=utils.get_dag_args(user_defined_filters=utils.get_date_filters()),
        template_searchpath=tmpl_search_path,
        schedule_interval=timedelta(days=1),
        max_active_runs=1,
        )

实用程序.py

def get_dag_args(**kwargs):
return {
    'owner'               : kwargs.get('owner', 'owner_name'),
    'depends_on_past'     : kwargs.get('depends_on_past', False),
    'start_date'          : kwargs.get('start_date', datetime.now() - timedelta(1)),
    'email'               : kwargs.get('email', ['blabla@blabla.com']),
    'retries'             : kwargs.get('retries', 5),
    'provide_context'     : kwargs.get('provide_context', True),
    'retry_delay'         : kwargs.get('retry_delay', timedelta(minutes=5)),
    'user_defined_filters': get_date_filters()
    }


def get_date_filters():
    return dict(
        get_year=lambda date_string: date_string.strftime('%Y'),
        get_month=lambda date_string: date_string.strftime('%m'),
        get_day=lambda date_string: date_string.strftime('%d'),
        )

有人看到我错在哪里了吗?谢谢!

编辑

在 dag 定义之后打印,没有显示自定义过滤器,不幸的是:(。

jinja_env = dag.get_template_env()
print(jinja_env.filters)

此外,如果我尝试将其直接添加为 DAG 对象参数,如测试@tests/models.py 中所示:

Broken DAG: [/home/ubuntu/airflow/dags/dag.py] __init__() got an unexpected keyword argument 'user_defined_filters'

编辑 2

好的,我看到的是我有 1.8.0 版本,而这个没有过滤器。有人知道如何通过 pip 下载 1.8.2rc 吗?或者我们不能?

4

2 回答 2

4

Airflow 现在支持自定义过滤器和宏

工作代码示例:

from airflow import DAG
from datetime import datetime, timedelta

def first_day_of_month(any_day):
    return any_day.replace(day=1)


def last_day_of_month(any_day):
    next_month = any_day.replace(day=28) + timedelta(days=4)  # this will never fail
    return next_month - timedelta(days=next_month.day)


def isoformat_month(any_date):
    return any_date.strftime("%Y-%m")


with DAG(
        dag_id='generate_raw_logs',
        default_args=default_args,
        schedule_interval=timedelta(minutes=120),
        catchup=False,
        user_defined_macros={
            'first_day_of_month': first_day_of_month,
            'last_day_of_month': last_day_of_month,
        },
        user_defined_filters={
            'isoformat_month': isoformat_month
        }
)
于 2018-12-26T03:57:41.573 回答
0

气流包装名称已在 pip 上更改。1.8.2rc1 可以使用 pip install apache-airflow 下载。

另外,请注意,根据邮件列表,他们目前正在将 1.8.2rc4 发布为 1.8.2。

于 2017-08-25T07:21:08.203 回答