5

我已安排每天运行 DAG 的执行。它可以完美运行一天。

但是,每天我不仅想重新执行当天的 {{ ds }},还想重新执行前 n 天(假设 n = 7)。

例如,在计划在“2018-01-30”运行的下一次执行中,我希望 Airflow 不仅使用执行日期“2018-01-30”运行 DAG,还希望为所有人重新运行 DAG前几天从“2018-01-23”到“2018-01-30”。

是否有一种简单的方法可以使先前的执行“无效”以便自动运行回填?

4

2 回答 2

6

您可以在循环中动态生成任务并将偏移量传递给您的操作员。

这是一个 Python 示例。

import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG

from datetime import timedelta


args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'schedule_interval': '0 10 * * *'
}

def check_trigger(execution_date, day_offset, **kwargs):
    target_date = execution_date - timedelta(days=day_offset)
    # use target_date

for day_offset in xrange(1, 8):
    PythonOperator(
        task_id='task_offset_' + i,
        python_callable=check_trigger,
        provide_context=True,
        dag=dag,
        op_kwargs={'day_offset' : day_offset}
    )
于 2018-01-30T18:03:22.810 回答
0

您是否考虑过让每天运行一次的 dag 仅在过去 7 天内运行您的任务?我想您将只有 7 个任务,每个任务都会产生一个与执行日期不同的日期偏移量的SubDAG 。

我认为这将使调试更容易,历史更清晰。我相信尝试回填已经执行的任务将涉及删除任务实例或将它们的状态全部设置为 NONE。然后,您仍然必须在这些 dag 运行中触发回填。当事情失败并且看起来有点混乱时,将更难跟踪。

于 2018-01-29T18:38:38.317 回答