10

我在 UTC+4 时区,所以当 Airflow 触发每晚的 ETL 时,这里已经是凌晨 4:00。如何告诉 Airflow 在 ds-1 日 20:00 触发第 ds 日的运行,但使用 ds=ds?

根据文档,强烈建议将所有服务器保持在 UTC,这就是我正在寻找应用程序级解决方案的原因。

编辑:一个 hacky 解决方案是将其定义为每天晚上 20:00 运行,因此是“前一天”,然后在工作中使用tomorrow_ds而不是使用ds。但这在 Airflow UI 上看起来仍然很奇怪,因为这将显示 UTC 执行时间。

4

3 回答 3

14

计划间隔也可以是“cron 表达式”,这意味着您可以轻松地在 20:00 UTC 运行它。再加上“user_defined_filters”意味着您可以通过一些技巧获得您想要的行为:

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime

import pytz
tz = pytz.timezone('Asia/Dubai')


def localize_utc_tz(d):
    return tz.fromutc(d)

default_args = {
    'start_date': datetime(2017, 11, 8),
}
dag = DAG(
    'plus_4_utc',
    default_args=default_args,
    schedule_interval='0 20 * * *',
    user_defined_filters={
        'localtz': localize_utc_tz,
    },
)
task = BashOperator(
        task_id='task_for_testing_file_log_handler',
        dag=dag,
        bash_command='echo UTC {{ ts }}, Local {{ execution_date | localtz }} next {{ next_execution_date | localtz }}',
)

这输出:

UTC 2017-11-08T20:00:00,本地 2017-11-09 00:00:00+04:00 下一个 2017-11-10 00:00:00+04:00

您必须小心使用的变量的“类型”。例如dsandts是字符串,而不是日期时间对象,这意味着过滤器不会对它们起作用

于 2017-11-10T16:38:37.310 回答
1

我想出了同样的问题。我有每天、每小时、半小时的工作。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import pendulum

local_tz = pendulum.timezone("Asia/Calcutta")

args = {
    'owner': 'ganesh',
    'depends_on_past': False,
    'start_date': datetime(2020, 3, 25, tzinfo=local_tz),
    'email': ['abcd@test.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    dag_id='test1',
    default_args=args,
    schedule_interval='30 00 * * *'
    )

first_date = BashOperator(
    task_id='first_date'
    ,
    bash_command='date'
    , dag=dag, env=None, output_encoding='utf-8')

second_date = BashOperator(
    task_id='second_date'
    ,
    bash_command='echo date'
    , dag=dag, env=None, output_encoding='utf-8')

first_date >> second_date



于 2020-03-27T19:04:22.470 回答
0

您可以编写一个 python 实用程序,将您的基于 tz 的时间表重写为 UTC? https://github.com/bloomberg/tzcron/blob/master/tzcron.py

编辑:最近的提交使 Airflow Timezone 意识到: https ://github.com/apache/incubator-airflow/commit/f1ab56cc6ad3b9419af94aaa333661c105185883

于 2017-11-07T11:03:18.623 回答