最近我测试了很多气流,execution_date
运行时有一个问题airflow trigger_dag <my-dag>
。
我了解到这execution_date
不是我们第一次从这里想到的:
Airflow 是作为 ETL 需求的解决方案而开发的。在 ETL 世界中,您通常会汇总数据。因此,如果我想汇总 2016 年 2 月 19 日的数据,我会在格林威治标准时间 2016 年 2 月 20 日午夜进行,这将是在 2016 年 2 月 19 日的所有数据可用之后。
start_date = datetime.combine(datetime.today(),
datetime.min.time())
args = {
"owner": "xigua",
"start_date": start_date
}
dag = DAG(dag_id="hadoopprojects", default_args=args,
schedule_interval=timedelta(days=1))
wait_5m = ops.TimeDeltaSensor(task_id="wait_5m",
dag=dag,
delta=timedelta(minutes=5))
上面的代码是我日常工作流程的开始部分,第一个任务是 TimeDeltaSensor,它在实际工作前再等 5 分钟,所以这意味着我的 dag 将在2016-09-09T00:05:00
, 2016-09-10T00:05:00
... 等处触发。
在 Web UI 中,我可以看到类似的scheduled__2016-09-20T00:00:00
内容,并且任务运行在2016-09-21T00:00:00
,根据ETL
模型,这似乎是合理的。
但是有一天我的 dag 没有因未知原因被触发,所以我手动触发它,如果我在 触发它2016-09-20T00:10:00
,那么 TimeDeltaSensor 将等到2016-09-21T00:15:00
运行之前。
这不是我想要的,我希望它2016-09-20T00:15:00
不是在第二天运行,我尝试过execution_date
通过--conf '{"execution_date": "2016-09-20"}'
,但它不起作用。
我应该如何处理这个问题?
$ airflow version
[2016-09-21 17:26:33,654] {__init__.py:36} INFO - Using executor LocalExecutor
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
v1.7.1.3