我正在尝试使用 Apache Airflow 创建工作流。所以基本上我已经在服务器中我自己的 anaconda 内核中手动安装了 Airflow。
这是我运行简单 DAG 的方式
export AIRFLOW_HOME=~/airflow/airflow_home # my airflow home
export AIRFLOW=~/.conda/.../lib/python2.7/site-packages/airflow/bin
export PATH=~/.conda/.../bin:$AIRFLOW:$PATH # my kernel
当我使用气流测试做同样的事情时,它独立地完成了特定的任务。例如在 dag1 中:task1 >> task2
airflow test dag1 task2 2017-06-22
我想它会先运行task1,然后运行task2。但它只是独立运行task2。
你们对此有什么想法吗?非常感谢您!
这是我的代码:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'txuantu',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'email': ['tran.xuantu@axa.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG(
'tutorial', default_args=default_args, schedule_interval=timedelta(1))
def python_op1(ds, **kwargs):
print(ds)
return 0
def python_op2(ds, **kwargs):
print(str(kwargs))
return 0
# t1, t2 and t3 are examples of tasks created by instantiating operators
# t1 = BashOperator(
# task_id='bash_operator',
# bash_command='echo {{ ds }}',
# dag=dag)
t1 = PythonOperator(
task_id='python_operator1',
python_callable=python_op1,
# provide_context=True,
dag=dag)
t2 = PythonOperator(
task_id='python_operator2',
python_callable=python_op2,
# provide_context=True,
dag=dag)
t2.set_upstream(t1)
气流:v1.8.0 使用带有 SQLLite 的执行器 SequentialExecutor
airflow run tutorial python_operator2 2015-06-01
这是错误消息:
[2017-06-28 22:49:15,336] {models.py:167} INFO - Filling up the DagBag from /home/txuantu/airflow/airflow_home/dags
[2017-06-28 22:49:16,069] {base_executor.py:50} INFO - Adding to queue: airflow run tutorial python_operator2 2015-06-01T00:00:00 --mark_success --local -sd DAGS_FOLDER/tutorial.py
[2017-06-28 22:49:16,072] {sequential_executor.py:40} INFO - Executing command: airflow run tutorial python_operator2 2015-06-01T00:00:00 --mark_success --local -sd DAGS_FOLDER/tutorial.py
[2017-06-28 22:49:16,765] {models.py:167} INFO - Filling up the DagBag from /home/txuantu/airflow/airflow_home/dags/tutorial.py
[2017-06-28 22:49:16,986] {base_task_runner.py:112} INFO - Running: ['bash', '-c', u'airflow run tutorial python_operator2 2015-06-01T00:00:00 --mark_success --job_id 1 --raw -sd DAGS_FOLDER/tutorial.py']
[2017-06-28 22:49:17,373] {base_task_runner.py:95} INFO - Subtask: [2017-06-28 22:49:17,373] {__init__.py:57} INFO - Using executor SequentialExecutor
[2017-06-28 22:49:17,694] {base_task_runner.py:95} INFO - Subtask: [2017-06-28 22:49:17,693] {models.py:167} INFO - Filling up the DagBag from /home/txuantu/airflow/airflow_home/dags/tutorial.py
[2017-06-28 22:49:17,899] {base_task_runner.py:95} INFO - Subtask: [2017-06-28 22:49:17,899] {models.py:1120} INFO - Dependencies not met for <TaskInstance: tutorial.python_operator2 2015-06-01 00:00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'failed': 0, 'upstream_failed': 0, 'skipped': 0, 'done': 0}, upstream_task_ids=['python_operator1']
[2017-06-28 22:49:22,011] {jobs.py:2083} INFO - Task exited with return code 0