0

我正在尝试使用 Apache Airflow 创建工作流。所以基本上我已经在服务器中我自己的 anaconda 内核中手动安装了 Airflow。

这是我运行简单 DAG 的方式

export AIRFLOW_HOME=~/airflow/airflow_home # my airflow home
export AIRFLOW=~/.conda/.../lib/python2.7/site-packages/airflow/bin
export PATH=~/.conda/.../bin:$AIRFLOW:$PATH # my kernel

当我使用气流测试做同样的事情时,它独立地完成了特定的任务。例如在 dag1 中:task1 >> task2

airflow test dag1 task2 2017-06-22

我想它会先运行task1,然后运行task2。但它只是独立运行task2。

你们对此有什么想法吗?非常感谢您!

这是我的代码:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'txuantu',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1),
    'email': ['tran.xuantu@axa.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG(
    'tutorial', default_args=default_args, schedule_interval=timedelta(1))


def python_op1(ds, **kwargs):
    print(ds)
    return 0


def python_op2(ds, **kwargs):
    print(str(kwargs))
    return 0

# t1, t2 and t3 are examples of tasks created by instantiating operators
# t1 = BashOperator(
#     task_id='bash_operator',
#     bash_command='echo {{ ds }}',
#     dag=dag)
t1 = PythonOperator(
    task_id='python_operator1',
    python_callable=python_op1,
    # provide_context=True,
    dag=dag)


t2 = PythonOperator(
    task_id='python_operator2',
    python_callable=python_op2,
    # provide_context=True,
    dag=dag)

t2.set_upstream(t1)

气流:v1.8.0 使用带有 SQLLite 的执行器 SequentialExecutor

airflow run tutorial python_operator2 2015-06-01

这是错误消息:

[2017-06-28 22:49:15,336] {models.py:167} INFO - Filling up the DagBag from /home/txuantu/airflow/airflow_home/dags
[2017-06-28 22:49:16,069] {base_executor.py:50} INFO - Adding to queue: airflow run tutorial python_operator2 2015-06-01T00:00:00 --mark_success --local -sd DAGS_FOLDER/tutorial.py
[2017-06-28 22:49:16,072] {sequential_executor.py:40} INFO - Executing command: airflow run tutorial python_operator2 2015-06-01T00:00:00 --mark_success --local -sd DAGS_FOLDER/tutorial.py
[2017-06-28 22:49:16,765] {models.py:167} INFO - Filling up the DagBag from /home/txuantu/airflow/airflow_home/dags/tutorial.py
[2017-06-28 22:49:16,986] {base_task_runner.py:112} INFO - Running: ['bash', '-c', u'airflow run tutorial python_operator2 2015-06-01T00:00:00 --mark_success --job_id 1 --raw -sd DAGS_FOLDER/tutorial.py']
[2017-06-28 22:49:17,373] {base_task_runner.py:95} INFO - Subtask: [2017-06-28 22:49:17,373] {__init__.py:57} INFO - Using executor SequentialExecutor
[2017-06-28 22:49:17,694] {base_task_runner.py:95} INFO - Subtask: [2017-06-28 22:49:17,693] {models.py:167} INFO - Filling up the DagBag from /home/txuantu/airflow/airflow_home/dags/tutorial.py
[2017-06-28 22:49:17,899] {base_task_runner.py:95} INFO - Subtask: [2017-06-28 22:49:17,899] {models.py:1120} INFO - Dependencies not met for <TaskInstance: tutorial.python_operator2 2015-06-01 00:00:00 [None]>, dependency 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'successes': 0, 'failed': 0, 'upstream_failed': 0, 'skipped': 0, 'done': 0}, upstream_task_ids=['python_operator1']
[2017-06-28 22:49:22,011] {jobs.py:2083} INFO - Task exited with return code 0
4

2 回答 2

1

如果你只想运行 python_operator2,你应该执行:

airflow run tutorial python_operator2 2015-06-01 --ignore_dependencies=False

如果要执行整个 dag 并执行两个任务,请使用 trigger_dag:

airflow trigger_dag tutorial

作为参考,airflow test将“在不检查依赖项的情况下运行任务”。

所有三个命令的文档都可以在https://airflow.incubator.apache.org/cli.html找到

于 2017-06-27T13:52:03.763 回答
1

最后,我找到了我的问题的答案。基本上我认为气流是惰性负载,但似乎不是。所以答案是:

t2.set_upstream(t1)

它应该是:

t1.set_downstream(t2)
于 2017-07-03T09:27:12.237 回答