我刚刚开始使用 Airbnb 的气流,我仍然不清楚如何/何时完成回填。
具体来说,有两个用例让我感到困惑:
如果我运行
airflow scheduler
几分钟,停止一分钟,然后再次重新启动,我的 DAG 似乎在前 30 秒左右运行了额外的任务,然后它继续正常运行(每 10 秒运行一次)。这些额外的任务是否“回填”了在早期运行中无法完成的任务?如果是这样,我将如何告诉气流不要回填这些任务?如果我运行
airflow scheduler
几分钟,然后运行airflow clear MY_tutorial
,然后重新启动airflow scheduler
,它似乎运行了大量的额外任务。这些任务是否也以某种方式“回填”任务?或者我错过了什么。
目前,我有一个非常简单的 dag:
default_args = {
'owner': 'me',
'depends_on_past': False,
'start_date': datetime(2016, 10, 4),
'email': ['airflow@airflow.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'MY_tutorial', default_args=default_args, schedule_interval=timedelta(seconds=10))
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 8)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
t3 = BashOperator(
task_id='templated',
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
dag=dag)
second_template = """
touch ~/airflow/logs/test
echo $(date) >> ~/airflow/logs/test
"""
t4 = BashOperator(
task_id='write_test',
bash_command=second_template,
dag=dag)
t1.set_upstream(t4)
t2.set_upstream(t1)
t3.set_upstream(t1)
我在气流配置中更改的唯一两件事是
- 我从使用 sqlite db 更改为使用 postgres db
- 我正在使用 a
CeleryExecutor
而不是 aSequentialExecutor
非常感谢你的帮助!